296 lines
11 KiB
Python
296 lines
11 KiB
Python
"""This file implements all-or-nothing package transformations.
|
|
|
|
An all-or-nothing package transformation is one in which some text is
|
|
transformed into message blocks, such that all blocks must be obtained before
|
|
the reverse transformation can be applied. Thus, if any blocks are corrupted
|
|
or lost, the original message cannot be reproduced.
|
|
|
|
An all-or-nothing package transformation is not encryption, although a block
|
|
cipher algorithm is used. The encryption key is randomly generated and is
|
|
extractable from the message blocks.
|
|
|
|
This class implements the All-Or-Nothing package transformation algorithm
|
|
described in:
|
|
|
|
Ronald L. Rivest. "All-Or-Nothing Encryption and The Package Transform"
|
|
http://theory.lcs.mit.edu/~rivest/fusion.pdf
|
|
|
|
"""
|
|
|
|
__revision__ = "$Id: AllOrNothing.py,v 1.8 2003/02/28 15:23:20 akuchling Exp $"
|
|
|
|
import operator
|
|
import string
|
|
from Crypto.Util.number import bytes_to_long, long_to_bytes
|
|
|
|
|
|
|
|
class AllOrNothing:
|
|
"""Class implementing the All-or-Nothing package transform.
|
|
|
|
Methods for subclassing:
|
|
|
|
_inventkey(key_size):
|
|
Returns a randomly generated key. Subclasses can use this to
|
|
implement better random key generating algorithms. The default
|
|
algorithm is probably not very cryptographically secure.
|
|
|
|
"""
|
|
|
|
def __init__(self, ciphermodule, mode=None, IV=None):
|
|
"""AllOrNothing(ciphermodule, mode=None, IV=None)
|
|
|
|
ciphermodule is a module implementing the cipher algorithm to
|
|
use. It must provide the PEP272 interface.
|
|
|
|
Note that the encryption key is randomly generated
|
|
automatically when needed. Optional arguments mode and IV are
|
|
passed directly through to the ciphermodule.new() method; they
|
|
are the feedback mode and initialization vector to use. All
|
|
three arguments must be the same for the object used to create
|
|
the digest, and to undigest'ify the message blocks.
|
|
"""
|
|
|
|
self.__ciphermodule = ciphermodule
|
|
self.__mode = mode
|
|
self.__IV = IV
|
|
self.__key_size = ciphermodule.key_size
|
|
if self.__key_size == 0:
|
|
self.__key_size = 16
|
|
|
|
__K0digit = chr(0x69)
|
|
|
|
def digest(self, text):
|
|
"""digest(text:string) : [string]
|
|
|
|
Perform the All-or-Nothing package transform on the given
|
|
string. Output is a list of message blocks describing the
|
|
transformed text, where each block is a string of bit length equal
|
|
to the ciphermodule's block_size.
|
|
"""
|
|
|
|
# generate a random session key and K0, the key used to encrypt the
|
|
# hash blocks. Rivest calls this a fixed, publically-known encryption
|
|
# key, but says nothing about the security implications of this key or
|
|
# how to choose it.
|
|
key = self._inventkey(self.__key_size)
|
|
K0 = self.__K0digit * self.__key_size
|
|
|
|
# we need two cipher objects here, one that is used to encrypt the
|
|
# message blocks and one that is used to encrypt the hashes. The
|
|
# former uses the randomly generated key, while the latter uses the
|
|
# well-known key.
|
|
mcipher = self.__newcipher(key)
|
|
hcipher = self.__newcipher(K0)
|
|
|
|
# Pad the text so that its length is a multiple of the cipher's
|
|
# block_size. Pad with trailing spaces, which will be eliminated in
|
|
# the undigest() step.
|
|
block_size = self.__ciphermodule.block_size
|
|
padbytes = block_size - (len(text) % block_size)
|
|
text = text + ' ' * padbytes
|
|
|
|
# Run through the algorithm:
|
|
# s: number of message blocks (size of text / block_size)
|
|
# input sequence: m1, m2, ... ms
|
|
# random key K' (`key' in the code)
|
|
# Compute output sequence: m'1, m'2, ... m's' for s' = s + 1
|
|
# Let m'i = mi ^ E(K', i) for i = 1, 2, 3, ..., s
|
|
# Let m's' = K' ^ h1 ^ h2 ^ ... hs
|
|
# where hi = E(K0, m'i ^ i) for i = 1, 2, ... s
|
|
#
|
|
# The one complication I add is that the last message block is hard
|
|
# coded to the number of padbytes added, so that these can be stripped
|
|
# during the undigest() step
|
|
s = len(text) / block_size
|
|
blocks = []
|
|
hashes = []
|
|
for i in range(1, s+1):
|
|
start = (i-1) * block_size
|
|
end = start + block_size
|
|
mi = text[start:end]
|
|
assert len(mi) == block_size
|
|
cipherblock = mcipher.encrypt(long_to_bytes(i, block_size))
|
|
mticki = bytes_to_long(mi) ^ bytes_to_long(cipherblock)
|
|
blocks.append(mticki)
|
|
# calculate the hash block for this block
|
|
hi = hcipher.encrypt(long_to_bytes(mticki ^ i, block_size))
|
|
hashes.append(bytes_to_long(hi))
|
|
|
|
# Add the padbytes length as a message block
|
|
i = i + 1
|
|
cipherblock = mcipher.encrypt(long_to_bytes(i, block_size))
|
|
mticki = padbytes ^ bytes_to_long(cipherblock)
|
|
blocks.append(mticki)
|
|
|
|
# calculate this block's hash
|
|
hi = hcipher.encrypt(long_to_bytes(mticki ^ i, block_size))
|
|
hashes.append(bytes_to_long(hi))
|
|
|
|
# Now calculate the last message block of the sequence 1..s'. This
|
|
# will contain the random session key XOR'd with all the hash blocks,
|
|
# so that for undigest(), once all the hash blocks are calculated, the
|
|
# session key can be trivially extracted. Calculating all the hash
|
|
# blocks requires that all the message blocks be received, thus the
|
|
# All-or-Nothing algorithm succeeds.
|
|
mtick_stick = bytes_to_long(key) ^ reduce(operator.xor, hashes)
|
|
blocks.append(mtick_stick)
|
|
|
|
# we convert the blocks to strings since in Python, byte sequences are
|
|
# always represented as strings. This is more consistent with the
|
|
# model that encryption and hash algorithms always operate on strings.
|
|
return map(long_to_bytes, blocks)
|
|
|
|
|
|
def undigest(self, blocks):
|
|
"""undigest(blocks : [string]) : string
|
|
|
|
Perform the reverse package transformation on a list of message
|
|
blocks. Note that the ciphermodule used for both transformations
|
|
must be the same. blocks is a list of strings of bit length
|
|
equal to the ciphermodule's block_size.
|
|
"""
|
|
|
|
# better have at least 2 blocks, for the padbytes package and the hash
|
|
# block accumulator
|
|
if len(blocks) < 2:
|
|
raise ValueError, "List must be at least length 2."
|
|
|
|
# blocks is a list of strings. We need to deal with them as long
|
|
# integers
|
|
blocks = map(bytes_to_long, blocks)
|
|
|
|
# Calculate the well-known key, to which the hash blocks are
|
|
# encrypted, and create the hash cipher.
|
|
K0 = self.__K0digit * self.__key_size
|
|
hcipher = self.__newcipher(K0)
|
|
|
|
# Since we have all the blocks (or this method would have been called
|
|
# prematurely), we can calcualte all the hash blocks.
|
|
hashes = []
|
|
for i in range(1, len(blocks)):
|
|
mticki = blocks[i-1] ^ i
|
|
hi = hcipher.encrypt(long_to_bytes(mticki))
|
|
hashes.append(bytes_to_long(hi))
|
|
|
|
# now we can calculate K' (key). remember the last block contains
|
|
# m's' which we don't include here
|
|
key = blocks[-1] ^ reduce(operator.xor, hashes)
|
|
|
|
# and now we can create the cipher object
|
|
mcipher = self.__newcipher(long_to_bytes(key))
|
|
block_size = self.__ciphermodule.block_size
|
|
|
|
# And we can now decode the original message blocks
|
|
parts = []
|
|
for i in range(1, len(blocks)):
|
|
cipherblock = mcipher.encrypt(long_to_bytes(i, block_size))
|
|
mi = blocks[i-1] ^ bytes_to_long(cipherblock)
|
|
parts.append(mi)
|
|
|
|
# The last message block contains the number of pad bytes appended to
|
|
# the original text string, such that its length was an even multiple
|
|
# of the cipher's block_size. This number should be small enough that
|
|
# the conversion from long integer to integer should never overflow
|
|
padbytes = int(parts[-1])
|
|
text = string.join(map(long_to_bytes, parts[:-1]), '')
|
|
return text[:-padbytes]
|
|
|
|
def _inventkey(self, key_size):
|
|
# TBD: Not a very secure algorithm. Eventually, I'd like to use JHy's
|
|
# kernelrand module
|
|
import time
|
|
from Crypto.Util import randpool
|
|
# TBD: key_size * 2 to work around possible bug in RandomPool?
|
|
pool = randpool.RandomPool(key_size * 2)
|
|
while key_size > pool.entropy:
|
|
pool.add_event()
|
|
|
|
# we now have enough entropy in the pool to get a key_size'd key
|
|
return pool.get_bytes(key_size)
|
|
|
|
def __newcipher(self, key):
|
|
if self.__mode is None and self.__IV is None:
|
|
return self.__ciphermodule.new(key)
|
|
elif self.__IV is None:
|
|
return self.__ciphermodule.new(key, self.__mode)
|
|
else:
|
|
return self.__ciphermodule.new(key, self.__mode, self.__IV)
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
import sys
|
|
import getopt
|
|
import base64
|
|
|
|
usagemsg = '''\
|
|
Test module usage: %(program)s [-c cipher] [-l] [-h]
|
|
|
|
Where:
|
|
--cipher module
|
|
-c module
|
|
Cipher module to use. Default: %(ciphermodule)s
|
|
|
|
--aslong
|
|
-l
|
|
Print the encoded message blocks as long integers instead of base64
|
|
encoded strings
|
|
|
|
--help
|
|
-h
|
|
Print this help message
|
|
'''
|
|
|
|
ciphermodule = 'AES'
|
|
aslong = 0
|
|
|
|
def usage(code, msg=None):
|
|
if msg:
|
|
print msg
|
|
print usagemsg % {'program': sys.argv[0],
|
|
'ciphermodule': ciphermodule}
|
|
sys.exit(code)
|
|
|
|
try:
|
|
opts, args = getopt.getopt(sys.argv[1:],
|
|
'c:l', ['cipher=', 'aslong'])
|
|
except getopt.error, msg:
|
|
usage(1, msg)
|
|
|
|
if args:
|
|
usage(1, 'Too many arguments')
|
|
|
|
for opt, arg in opts:
|
|
if opt in ('-h', '--help'):
|
|
usage(0)
|
|
elif opt in ('-c', '--cipher'):
|
|
ciphermodule = arg
|
|
elif opt in ('-l', '--aslong'):
|
|
aslong = 1
|
|
|
|
# ugly hack to force __import__ to give us the end-path module
|
|
module = __import__('Crypto.Cipher.'+ciphermodule, None, None, ['new'])
|
|
|
|
a = AllOrNothing(module)
|
|
print 'Original text:\n=========='
|
|
print __doc__
|
|
print '=========='
|
|
msgblocks = a.digest(__doc__)
|
|
print 'message blocks:'
|
|
for i, blk in map(None, range(len(msgblocks)), msgblocks):
|
|
# base64 adds a trailing newline
|
|
print ' %3d' % i,
|
|
if aslong:
|
|
print bytes_to_long(blk)
|
|
else:
|
|
print base64.encodestring(blk)[:-1]
|
|
#
|
|
# get a new undigest-only object so there's no leakage
|
|
b = AllOrNothing(module)
|
|
text = b.undigest(msgblocks)
|
|
if text == __doc__:
|
|
print 'They match!'
|
|
else:
|
|
print 'They differ!'
|