First commit
This commit is contained in:
parent
eb45c125aa
commit
23fb12f53a
61
system/dataset.py
Normal file
61
system/dataset.py
Normal file
@ -0,0 +1,61 @@
|
||||
#import logger
|
||||
from scipy import array,take,asarray,shape
|
||||
import project
|
||||
#from sets import Set as set
|
||||
from itertools import izip
|
||||
|
||||
|
||||
class Dataset:
|
||||
"""
|
||||
Dataset base class
|
||||
"""
|
||||
def __init__(self,input_array,def_list,parents=None):
|
||||
self._data = asarray(input_array)
|
||||
self.dims = shape(self._data)
|
||||
self.parents = parents
|
||||
self.def_list = def_list
|
||||
self._ids_set = set()
|
||||
self.ids={}
|
||||
self.children=[]
|
||||
self._dim_num = {}
|
||||
if parents!=None:
|
||||
for parent in self.parents:
|
||||
parent.children.append(self)
|
||||
if len(def_list)!=len(self.dims):
|
||||
raise ValueError,"array dims and identifyer mismatch"
|
||||
for axis,(dim_name,ids) in enumerate(def_list):
|
||||
enum_ids = {}
|
||||
if dim_name not in project.c_p.dim_names:
|
||||
dim_name = project.c_p.suggest_dim_name(dim_name)
|
||||
if not ids:
|
||||
ids = self._create_identifiers(axis)
|
||||
for num,name in enumerate(ids):
|
||||
enum_ids[name] = num
|
||||
self.ids[dim_name] = enum_ids
|
||||
self._ids_set = self._ids_set.union(set(ids))
|
||||
self._dim_num[dim_name] = axis
|
||||
for df,d in izip(def_list,self.dims):
|
||||
df=df[1]
|
||||
if len(df)!=d and df:
|
||||
raise ValueError,"dim size and identifyer mismatch"
|
||||
|
||||
|
||||
def extract_data(self,ids,dim_name):
|
||||
new_def_list = self.def_list[:]
|
||||
ids_index = [self.ids[dim_name][id_name] for id_name in ids]
|
||||
dim_number = self._dim_num[dim_name]
|
||||
try:
|
||||
out_data = take(self._data,ids_index,axis=dim_number)
|
||||
except:
|
||||
raise ValueError
|
||||
new_def_list[dim_number][1] = ids
|
||||
D = Dataset(out_data,def_list=new_def_list,parents=self.parents)
|
||||
return D
|
||||
|
||||
def _create_identifiers(self,axis):
|
||||
n_dim = self.dims[axis]
|
||||
return [str(axis) + '_' + str(i) for i in range(n_dim)]
|
||||
|
||||
class Selection:
|
||||
def __init__(self):
|
||||
self.current_selection={}
|
49
system/project.py
Normal file
49
system/project.py
Normal file
@ -0,0 +1,49 @@
|
||||
#import logger
|
||||
import dataset
|
||||
|
||||
class Project:
|
||||
def __init__(self,name="Testing"):
|
||||
self.name = name
|
||||
self.dim_names = []
|
||||
self._selection_observers = []
|
||||
self.current_selection = {}
|
||||
self.current_data=[]
|
||||
self.datasets=[]
|
||||
|
||||
def attach(self, observer):
|
||||
if not observer in self._selection_observers:
|
||||
self._selection_observers.append(observer)
|
||||
|
||||
def detach(self, observer):
|
||||
try:
|
||||
self.selection_observers.remove(observer)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
def notify(self, modifier=None):
|
||||
for observer in self.selection_observers:
|
||||
if modifier != observer:
|
||||
observer.update(self)
|
||||
|
||||
def set_selection(self,dim_name,selection):
|
||||
current_selection = set(selection)
|
||||
self.current_selection[dim_name] = current_selection
|
||||
self.notify()
|
||||
|
||||
def get_selection(self,sel_obj):
|
||||
return sel_obj.current_selection
|
||||
|
||||
|
||||
def add_dataset(self,dataset):
|
||||
self.datasets.append(dataset)
|
||||
for dim_name in dataset.ids.keys():
|
||||
if dim_name not in self.dim_names:
|
||||
self.dim_names.append(dim_name)
|
||||
|
||||
def suggest_dim_name(self,dim_name):
|
||||
if dim_name in self.dim_names:
|
||||
dim_name = dim_name + "_t"
|
||||
return dim_name
|
||||
|
||||
|
||||
c_p = Project()
|
33
test/system/datasettest.py
Normal file
33
test/system/datasettest.py
Normal file
@ -0,0 +1,33 @@
|
||||
import unittest
|
||||
import sys
|
||||
sys.path.append('../../system')
|
||||
from dataset import *
|
||||
from scipy import rand,shape
|
||||
|
||||
class DatasetTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.dim_0_ids = ['sample_a','sample_b']
|
||||
self.dim_1_ids = ['gene_a','gene_b','gene_c']
|
||||
self.dim_labels = ['samples','genes']
|
||||
self.def_list = [[self.dim_labels[0],self.dim_0_ids],[self.dim_labels[1],self.dim_1_ids]]
|
||||
self.array = rand(2,3)
|
||||
self.testdata = Dataset(self.array,self.def_list)
|
||||
|
||||
def testCreation(self):
|
||||
assert self.testdata._data == self.array
|
||||
assert 'sample_a' in self.testdata.ids['samples'].keys()
|
||||
assert 'gene_b' in self.testdata.ids['genes'].keys()
|
||||
|
||||
|
||||
|
||||
def testExtraction(self):
|
||||
ids = ['gene_a','gene_b']
|
||||
dim_name = 'genes'
|
||||
subset = self.testdata.extract_data(ids,dim_name)
|
||||
assert shape(subset._data) == (2,2)
|
||||
assert subset.ids[dim_name].keys() == ids
|
||||
assert subset.ids[dim_name].values() == [0,1]
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Reference in New Issue
Block a user