import gobject import gtk import networkx import re class GOTerm: def __init__(self): self.d = {} ## Create empty lists for all list values for l in GOTerm.lists: self.d[l] = [] for s in GOTerm.scalars: self.d[l] = None def __getitem__(self, key): if self.d.has_key(key): return self.d[key] return None def __setitem__(self, key, value): self.d[key] = value GOTerm.lists = ['is_a', 'alt_id', 'exact_synonym', 'broad_synonym', 'narrow_synonym', 'related_synonym', 'relationship', 'subset', 'synonym', 'xref_analog', 'xref_unknown'] GOTerm.scalars = ['name', 'id', 'namespace', 'def', 'is_transitive', 'comment', 'is_obsolete'] class GeneOntology(networkx.XDiGraph): def __init__(self): networkx.XDiGraph.__init__(self) self.by_id = {} self.undirected = None def add_term(self, term): self.add_node(term) self.by_id[term['id']] = term def link_ontology(self, linkattr, obsolete=False): for node in self.nodes(): for link in node[linkattr]: self.add_edge(self.by_id[link], node, linkattr) def link_relationships(self): for node in self.nodes(): for link in node['relationship']: link_type, term = link.split(' ') self.add_edge(self.by_id[term.strip()], node, link_type.strip()) def get_bp(self): """Returns the root node of the biological_process tree""" return self.by_id['GO:0008150'] def get_cc(self): """Returns the root node of the cellular_component tree""" return self.by_id['id: GO:0005575'] def get_mf(self): """Returns the root node of the molecular_function tree""" return self.by_id['GO:0003674'] def _subsumer(self, t1, t2, heap): while heap != []: t = heap[0] heap = heap[1:] p1 = networkx.shortest_path(self, t, t1) p2 = networkx.shortest_path(self, t, t2) if p1 and p2: return t heap += self.in_neighbors(t) return None def subsumer(self, t1, t2): if t1 == t2: return t1 if networkx.shortest_path(self, t1, t2): return t1 elif networkx.shortest_path(self, t2, t1): return t2 return self._subsumer(t1, t2, self.in_neighbors(t1)) def old_subsumer(self, t1, t2): if t1 == t2: return t1 if self.undirected == None: self.undirected = self.to_undirected() path = networkx.shortest_path(self.undirected, t1, t2) if not path: print "Woah, path not found." return None if path == [1]: print "This shouldn't happen" return t1 for t in path: if networkx.shortest_path(self, t, t1) and \ networkx.shortest_path(self, t, t2): return t print "GeneOntology.subsumer: should not reach this point" print "path is now: %s" % path print "ids are: %s " % [x['id'] for x in path] def _split_obo_line(line): """Splits a line from an obo file in its three constituent parts. @param line: A string containing a line from an obo file. The line must either be a section definition field with a section name in brackets or a line of the form: keyword: value ! comment @returns: A tuple of four strings conaining the section, key, value and description defined in the string. If the section part is None, all the other fields are strings and if section is a string, all the other fields are None. """ attrib_re = re.compile(r'^\s*([\w-]+)\s*:\s*([^!]*)!?(.*$)') s = line.strip() if s == "": return (None, None, None, None) elif s.startswith('[') and s.endswith(']'): return (s[1:-1], None, None, None) else: m = attrib_re.match(s) if m: key, value, comment = [x.strip() for x in m.groups()] return (None, key, value, comment) else: raise Exception('Unparsable line: %s' % line) def _add_term_attribute(term, key, value, comment): if key in GOTerm.scalars: term[key] = value elif key in GOTerm.lists: term[key].append(value) else: raise Exception('Unknown key %s: %s' % (key, value)) def read_gene_ontology(fd): """Reads the Gene Ontology from an obo file. @param fd: An open file object to the obo file. """ go = GeneOntology() term = None section = None line = fd.readline() while line: s, k, v, c = _split_obo_line(line) if s == None and k == None: pass elif s: if term: go.add_term(term) section = s if s == 'Term': term = GOTerm() else: term = None print "ignoring: %s" %s else: if term: _add_term_attribute(term, k, v, c) line = fd.readline() if term: go.add_term(term) return go def pickle_gene_ontology(go, fn): fd = open(fn, 'wb') pickle.dump(go, fd) fd.close() def load_pickled_ontology(fn): fd = open(fn, 'rb') go = pickle.load(fd) fd.close() return go def read_default_go(): f = open("/usr/share/gene-ontology/gene_ontology.obo") go = read_gene_ontology(f) go.link_ontology('is_a') go.link_relationships() f.close() return go def _add_subgraphs(treestore, ontology, parent, nodes): for n in nodes: i = treestore.insert(parent, 0, (n['id'], n['name'], False, n)) _add_subgraphs(treestore, ontology, i, ontology.successors(n)) def get_go_treestore(ontology): ts = gtk.TreeStore(gobject.TYPE_STRING, ## ID gobject.TYPE_STRING, ## Name gobject.TYPE_BOOLEAN, ## Selected gobject.TYPE_PYOBJECT) ## Node _add_subgraphs(ts, ontology, None, [ontology.get_bp()]) return ts class NetworkTreeModel(gtk.GenericTreeModel): def __init__(self, network, root): gtk.GenericTreeModel.__init__(self) self._network = network self._root = root def on_get_flags(self): return 0 def on_get_n_columns(self): return 1 def on_get_column_type(self, index): if index==0: return gobject.TYPE_STRING def on_get_iter(self, path): node = self._root for p in path[1:]: children = self._network.predecessors(node) node = children[p] return node def on_get_path(self, rowref): pass def on_get_value(self, rowref, column): print 'get_value' return rowref['id'] def on_iter_next(self, rowref): pass def on_iter_children(self, parent): pass def on_iter_has_child(self, rowref): pass def on_iter_n_children(self, rowref): pass def on_iter_nth_child(self, parent, n): pass def on_iter_parent(self, child): pass