#! /usr/bin/python
import GnuPGInterface
import string, re, sys
gnupg = GnuPGInterface.GnuPG()
gnupg.options.extra_args += ['--homedir', 'remailer-keys', '--batch']
# The keyring should hold private keys for all the remailers in the chain,
# and each one should have a passphrase equal to the name of the key.
def passphrase(bigname):
return re.search(r'(\w+@\w+\.\w+)', bigname).group(1)
def unwind(recipient, message):
"""Unwind a message. Takes two parameters: the To: address of the
message and the message body itself.
This function will return a list of steps. Each element is a dict
describing what happened on that particular step. The decryption and
remailing are separate steps. The dict keys are as follows:
recipient: the target of that step, as requested by the previous step
encrypted: 1 if the 'Encrypted: PGP' flag was set, else non-existent
anon-to: name in the Anon-To: header, if present
subject: subject specified in a ## header, if present
message: plaintext of the message (for the last step)
Typical steps:
recipient=rem1, encrypted=1 ->recurse BRANCH2
recipient=rem1, anon-to=rem2 BRANCH3, recurse
recipient=rem2, encrypted=1 ->recurse BRANCH2
recipient=rem2, anon-to=user, subject=test BRANCH3, recurse
recipient=user, message=plaintext : BRANCH1, terminate
It throws an exception if anything about the message is incorrect:
message doesn't start with ::\nEncrypted: PGP
message isn't encrypted or is encrypted to the wrong key (not TO)
decrypted message doesn't start with ::\nAnon-To:
"""
# the encrypted message is handled by unwind1. It will look like:
# ::\nEncrypted: PGP\n\n-----BEGIN PGP MESSAGE----\n etc
# once decrypted, the message should look like:
# ::\nAnon-To: dest@foo.com\n\n##\nSubject: test subject\n\nmessage body
if 0:
print "-----"
print message
print "-----"
step = {}
# see if we should end the recursion now
if not re.search(r'^rem\d@test.test', recipient):
# BRANCH 1
# plaintext message
step['recipient'] = recipient
step['message'] = message
return [step]
message = string.split(message, "\n")
if message[0] != '::':
raise 'bad message', 'first line was not ::'
if message[2] != '':
raise 'bad message', 'third line was not blank'
# see if it's encrypted
if message[1] == 'Encrypted: PGP':
# yes, decrypt it and recurse
# BRANCH2
# step is: recipient=rem1, encrypted=1
crypttext = message[3:]
assert(message[3] == '-----BEGIN PGP MESSAGE-----')
# decrypt here
gnupg.passphrase = passphrase(recipient) # passphrase == keyname
p = gnupg.run(['--decrypt'],
create_fhs=['stdin', 'stdout'],
)
p.handles['stdin'].write(string.join(crypttext,"\n"))
p.handles['stdin'].close()
plaintext = p.handles['stdout'].read()
p.handles['stdout'].close()
p.wait()
step['recipient'] = recipient
step['encrypted'] = 1
return [step] + unwind(recipient, plaintext)
# BRANCH3
# message is now:
# ::\nAnon-To: remN@test.set (or dest@foo.com)\n\nmessage body
# or
# ::\nAnon-To: dest@foo.com\n\n##\nSubject: subject\n\nmessage body
r = re.search(r'^Anon-To: (.*)$', message[1])
if not r:
print "Bad Message, no Anon-To"
print message
raise 'bad message', "no anon-to"
step['recipient'] = recipient
step['anon-to'] = r.group(1)
message = message[3:]
# now: "message body", or "##\nSubject: sub\n\nmessage body"
if message[0] == '##':
# there are ## headers included
r = re.search(r'^Subject: (.*)$', message[1])
if not r:
raise 'bad message', "## but no Subject: header"
step['subject'] = r.group(1)
if message[2] != '':
raise 'bad message', "no blank line after ## headers"
message = message[3:]
# now: "message body"
message = string.join(message, "\n")
return [step] + unwind(step['anon-to'], message)
def insistEquals(one, two):
if one != two:
raise "results don't match expected", "'%s' != '%s'" % (one, two)
def test_chain(firsthop, crypttext, plaintext, recipient, chain, subject=None):
"""Verify that the crypttext message does indeed match the plaintext
message, sent to a given recipient and encrypted to the given remailer
chain."""
# chain is a list of remailers with long names: rem1@test.test, etc
# build up the list of what we expect to see at each step
expected_chain = []
for i in range(len(chain)-1):
expected_chain.append({'recipient': chain[i],
'encrypted': 1,
})
expected_chain.append({'recipient': chain[i],
'anon-to': chain[i+1],
})
last = chain[len(chain)-1]
expected_chain.append({'recipient': last,
'encrypted': 1,
})
pentultimate = {'recipient': last,
'anon-to': recipient,
}
if subject:
pentultimate['subject'] = subject
expected_chain.append(pentultimate)
expected_chain.append({'recipient': recipient,
'message': plaintext,
})
# unwind the messge
insistEquals(firsthop, chain[0])
results = unwind(firsthop, crypttext)
# compare against expectations
insistEquals(len(expected_chain), len(results))
for i in range(len(results)):
#print i, results[i], expected_chain[i]
insistEquals(results[i], expected_chain[i])
print "TEST CASE PASSED"
def test1():
m1 = open("m1").read()
chain = unwind("rem3@test.test", m1)
for link in chain: print link
def test2():
m1 = open("m1").read()
chain = ["rem3", "rem2", "rem1", "rem1"]
test_chain(firsthop="rem3@test.test", crypttext=m1,
plaintext="test message\n",
recipient="warner@lothar.com",
chain=chain,
subject="test subject")
def main():
# argv is: ['recipient', 'chain1,chain2,chain3', 'subject']
# plaintext is always "test message\n"
# crypttext arrives on stdin
recipient = sys.argv[1]
chain = string.split(sys.argv[2], ',')
subject = sys.argv[3]
crypttext = sys.stdin.read()
test_chain(firsthop=chain[0],
crypttext=crypttext,
plaintext="test message\n",
recipient=recipient,
chain=chain,
subject=subject)
if __name__ == '__main__':
main()
syntax highlighted by Code2HTML, v. 0.9.1