This repository has been archived on 2024-07-04. You can view files and clone it, but cannot push or open issues or pull requests.
laydi/workflows/geneontology.py

255 lines
7.1 KiB
Python

import gobject
import gtk
import networkx
import re
class GOTerm:
def __init__(self):
self.d = {}
## Create empty lists for all list values
for l in GOTerm.lists:
self.d[l] = []
for s in GOTerm.scalars:
self.d[l] = None
def __getitem__(self, key):
if self.d.has_key(key):
return self.d[key]
return None
def __setitem__(self, key, value):
self.d[key] = value
GOTerm.lists = ['is_a', 'alt_id', 'exact_synonym', 'broad_synonym',
'narrow_synonym', 'related_synonym', 'relationship',
'subset', 'synonym', 'xref_analog', 'xref_unknown']
GOTerm.scalars = ['name', 'id', 'namespace', 'def', 'is_transitive',
'comment', 'is_obsolete']
class GeneOntology(networkx.XDiGraph):
def __init__(self):
networkx.XDiGraph.__init__(self)
self.by_id = {}
self.undirected = None
def add_term(self, term):
self.add_node(term)
self.by_id[term['id']] = term
def link_ontology(self, linkattr, obsolete=False):
for node in self.nodes():
for link in node[linkattr]:
self.add_edge(self.by_id[link], node, linkattr)
def link_relationships(self):
for node in self.nodes():
for link in node['relationship']:
link_type, term = link.split(' ')
self.add_edge(self.by_id[term.strip()], node, link_type.strip())
def get_bp(self):
"""Returns the root node of the biological_process tree"""
return self.by_id['GO:0008150']
def get_cc(self):
"""Returns the root node of the cellular_component tree"""
return self.by_id['id: GO:0005575']
def get_mf(self):
"""Returns the root node of the molecular_function tree"""
return self.by_id['GO:0003674']
def _subsumer(self, t1, t2, heap):
while heap != []:
t = heap[0]
heap = heap[1:]
p1 = networkx.shortest_path(self, t, t1)
p2 = networkx.shortest_path(self, t, t2)
if p1 and p2:
return t
heap += self.in_neighbors(t)
return None
def subsumer(self, t1, t2):
if t1 == t2:
return t1
if networkx.shortest_path(self, t1, t2):
return t1
elif networkx.shortest_path(self, t2, t1):
return t2
return self._subsumer(t1, t2, self.in_neighbors(t1))
def old_subsumer(self, t1, t2):
if t1 == t2:
return t1
if self.undirected == None:
self.undirected = self.to_undirected()
path = networkx.shortest_path(self.undirected, t1, t2)
if not path:
print "Woah, path not found."
return None
if path == [1]:
print "This shouldn't happen"
return t1
for t in path:
if networkx.shortest_path(self, t, t1) and \
networkx.shortest_path(self, t, t2):
return t
print "GeneOntology.subsumer: should not reach this point"
print "path is now: %s" % path
print "ids are: %s " % [x['id'] for x in path]
def _split_obo_line(line):
"""Splits a line from an obo file in its three constituent parts.
@param line: A string containing a line from an obo file. The line must
either be a section definition field with a section name in brackets
or a line of the form: keyword: value ! comment
@returns: A tuple of four strings conaining the section, key, value and
description defined in the string. If the section part is None, all
the other fields are strings and if section is a string, all the other
fields are None.
"""
attrib_re = re.compile(r'^\s*([\w-]+)\s*:\s*([^!]*)!?(.*$)')
s = line.strip()
if s == "":
return (None, None, None, None)
elif s.startswith('[') and s.endswith(']'):
return (s[1:-1], None, None, None)
else:
m = attrib_re.match(s)
if m:
key, value, comment = [x.strip() for x in m.groups()]
return (None, key, value, comment)
else:
raise Exception('Unparsable line: %s' % line)
def _add_term_attribute(term, key, value, comment):
if key in GOTerm.scalars:
term[key] = value
elif key in GOTerm.lists:
term[key].append(value)
else:
raise Exception('Unknown key %s: %s' % (key, value))
def read_gene_ontology(fd):
"""Reads the Gene Ontology from an obo file.
@param fd: An open file object to the obo file.
"""
go = GeneOntology()
term = None
section = None
line = fd.readline()
while line:
s, k, v, c = _split_obo_line(line)
if s == None and k == None:
pass
elif s:
if term:
go.add_term(term)
section = s
if s == 'Term':
term = GOTerm()
# print "[Term]"
else:
term = None
#print "ignoring: %s" %s
else:
if term:
_add_term_attribute(term, k, v, c)
# print " %s: %s" % (k, v)
# else:
# print "no term: ignoring: %s" %line
# print '.',
line = fd.readline()
if term:
go.add_term(term)
return go
def read_default_go():
f = open("/usr/share/gene-ontology/gene_ontology.obo")
go = read_gene_ontology(f)
go.link_ontology('is_a')
go.link_relationships()
f.close()
return go
def _add_subgraphs(treestore, ontology, parent, nodes):
for n in nodes:
i = treestore.insert(parent, 0, (n['id'], n['name'], False, n))
_add_subgraphs(treestore, ontology, i, ontology.successors(n))
def get_go_treestore(ontology):
ts = gtk.TreeStore(gobject.TYPE_STRING, ## ID
gobject.TYPE_STRING, ## Name
gobject.TYPE_BOOLEAN, ## Selected
gobject.TYPE_PYOBJECT) ## Node
_add_subgraphs(ts, ontology, None, [ontology.get_bp()])
return ts
class NetworkTreeModel(gtk.GenericTreeModel):
def __init__(self, network, root):
gtk.GenericTreeModel.__init__(self)
self._network = network
self._root = root
def on_get_flags(self):
return 0
def on_get_n_columns(self):
return 1
def on_get_column_type(self, index):
if index==0:
return gobject.TYPE_STRING
def on_get_iter(self, path):
node = self._root
for p in path[1:]:
children = self._network.predecessors(node)
node = children[p]
return node
def on_get_path(self, rowref):
pass
def on_get_value(self, rowref, column):
print 'get_value'
return rowref['id']
def on_iter_next(self, rowref):
pass
def on_iter_children(self, parent):
pass
def on_iter_has_child(self, rowref):
pass
def on_iter_n_children(self, rowref):
pass
def on_iter_nth_child(self, parent, n):
pass
def on_iter_parent(self, child):
pass