New module file_io which takes care of this. All output should go through file_io.write, which encodes it using the appropriate character encoding. For output to a temporary file, use "with file_io.tmpfile('name')". Moved WorblehatException to new module exc, so all modules can access it without importing worblehat.
230 lines
7.0 KiB
230 lines
7.0 KiB
import re
import types
from exc import WorblehatException
# The possible fields for each type of object.
# Each field is a tuple (fieldname, fieldtype). Fieldtype is either
# 's' (string), 'd' (dictionary, where the values are lists of
# strings), 'l' (list of strings).
fields = {
[('isbn', 's'), ('id', 's'), ('title', 's'), ('category', 's'),
('subtitle', 's'), ('persons', 'd'), ('publisher', 's'),
('published_year', 'i'), ('edition', 'i'), ('pages', 'i'),
('series', 's'), ('description', 's'), # TODO picture, thumbnail
('references', 'd')],
[('id', 's'), ('firstname', 's'), ('lastname', 's')],
[('id', 's'), ('name', 's'), ('placement', 'l')] }
# Fields associated with each action.
# The 'type' is a key in the fields dictionary, indicating the set of
# fields which can be used with the action. The 'required' list is a
# list of fields which must be present for the action to be accepted.
action_fields = {
{ 'type': 'book',
'required': ['isbn', 'title', 'category'] },
{ 'type': 'book',
'required': ['isbn'] },
{ 'type': 'book',
'required': ['isbn'] },
{ 'type': 'person',
'required': ['id', 'firstname', 'lastname'] },
{ 'type': 'person',
'required': ['id'] },
{ 'type': 'person',
'required': ['id'] },
{ 'type': 'category',
'required': ['id', 'name'] },
{ 'type': 'category',
'required': ['id'] },
{ 'type': 'category',
'required': ['id'] } }
class CommitFormatSyntaxError(WorblehatException):
def __init__(self, msg, linenr):
WorblehatException.__init__(self, 'Syntax error on line %d: %s' % (linenr, msg))
class CommitFormatError(WorblehatException):
def __init__(self, msg):
WorblehatException.__init__(self, msg)
def read_field_value_str(val):
if val.strip() == '':
return None
return '\n'.join(map(lambda x: x.strip(), val.split('\n'))).strip()
def read_field_value_int(val):
if val.strip() == '':
return None
return int(val.strip())
except ValueError, TypeError:
raise WorblehatException('%s is not an integer' % val)
def read_field_value_dict(val):
d = {}
if val.strip() == '':
return d
for line in val.strip().split('\n'):
key, value = line.strip().split(' ', 1)
if key in d:
d[key] = [value]
return d
def read_field_value_list(val):
return val.strip().split(' ')
def read_action(lines):
Parse text as an action, returning a dictionary.
print 'reading action'
print 'lines:'
print lines
d = {}
lastfield = None
for (linenr, line) in lines:
if len(line) == 0:
raise CommitFormatSyntaxError('Empty line in action', linenr)
if line[0] in [' ', '\t']: # continuation line
if not lastfield:
raise CommitFormatSyntaxError('First line is continuation line: ' + line, linenr)
d[lastfield] = d[lastfield] + '\n' + line.strip()
field, value = line.split(':', 1)
d[field] = value.strip()
lastfield = field
firstlinenr = lines[0][0]
if 'action' not in d:
raise CommitFormatSyntaxError('Missing \'action\' field', firstlinenr)
action = d['action']
print 'dict:'
print d
for field in action_fields[action]['required']:
if field not in d:
raise CommitFormatSyntaxError('Missing required field \'%s\' in \'%s\' action' %
(field, action),
data_type = action_fields[action]['type']
result = { 'action': action }
for field, ftype in fields[data_type]:
if field in d:
reader = { 's': read_field_value_str,
'i': read_field_value_int,
'd': read_field_value_dict,
'l': read_field_value_list }[ftype]
result[field] = reader(d[field])
result[field] = None
print 'final dict:'
print result
return result
def read_paragraphs(text):
lines = text.split('\n')
current_para = []
paragraphs = []
comment_re = r'^#.*$'
blank_re = r'^[ \t]*$'
for i in xrange(len(lines)):
l = lines[i]
if re.match(comment_re, l):
print 'comment:', l
elif re.match(blank_re, l):
print 'blank:', l
if len(current_para) > 0:
current_para = []
current_para.append((i+1, l))
if len(current_para) > 0:
current_para = []
return paragraphs
def read_actionlist(text):
Parse text as a list of actions.
The result is a list of dictionaries.
paragraphs = read_paragraphs(text)
print 'paragraphs:', paragraphs
return map(read_action, paragraphs)
def write_field_value_str(val):
lines = ''
if not val:
val = ''
value_lines = val.split('\n')
for l in value_lines:
lines += ' ' + l + '\n'
if len(value_lines) > 1:
lines = '\n' + lines
return lines
def write_field_value_int(val):
return ' %s\n' % str(val)
def write_field_value_dict(val):
lines = '\n'
for (key,values) in val.items():
for single_value in values:
lines += ' ' + key + ' ' + single_value + '\n'
return lines
def write_field_value_list(val):
lines = ''
for single_value in val:
lines += ' ' + single_value
return lines
def make_comment(s):
return '\n'.join(map(lambda x: '# ' + x,
s.split('\n'))) + '\n'
def write_action(d):
if type(d) in types.StringTypes:
return make_comment(d)
lines = u''
if 'comment' in d:
lines += make_comment(d['comment'])
action = d['action']
lines += 'action: ' + action + '\n'
data_type = action_fields[action]['type']
for field, ftype in fields[data_type]:
if field in d:
value_writer = {'s': write_field_value_str,
'i': write_field_value_int,
'd': write_field_value_dict,
'l': write_field_value_list}[ftype]
lines += field + ':' + value_writer(d[field])
return lines
def write_actionlist(actions):
return '\n'.join(map(write_action, actions))
# test: print write_actionlist([{'comment':'Foo!\nBar!','action':'new-book','isbn':'434545'},{'action':'edit-book','isbn':'654654745','persons':{'author':['ab','foo'],'illustrator':['moo']}},'This\nis\na\ncomment.',{'action':'edit-category','id':'matematikk','name':'Matematikk','placement':['T10','T11']}])