In my spare time, I'm working on a project where I have to sign and verify SMIME mail using M2Crypto which works quite well, but lacks a bit in documentation especially on SMIME functions. The Programming S/MIME in Python with M2Crypto howto is enough to point you in the right direction, and the source has a few SMIME examples. What is missing is a recipe to verify signed SMIME messages if you don't have the signer's certificate, which is what usually happens when you have to verify Internet email.
Openssl's smime command is able to do that, so there should be a way to accomplish the same thing from Python using M2Crypto. After a bit of fiddling around and looking at openssl's source, I have found out a way which seems to work (update: content check done against the output of SMIME.smime_load_pkcs7_bio instead of using email.message_from_string, return a list of certificates on succesful verification, show content diff if verification fails):
#!/usr/bin/python """ Simple class to verify SMIME signed email messages without having to know the signer's certificate. The signer's certificate(s) is extracted from the signed message, and returned on successful verification. A unified diff of the cleartext content against the one resulting from verification is returned as exception value if the content has been tampered with. Use at your own risk, send comments and fixes. May 30, 2004 Ludovico Magnocavallo <ludo@asiatica.org> """ import os, base64 from M2Crypto import BIO, SMIME, m2, X509 from difflib import unified_diff class VerifierError(Exception): pass class Verifier(object): """ accepts an email payload and verifies it with SMIME """ def __init__(self, certstore): """ certstore - path to the file used to store CA certificates eg /etc/apache/ssl.crt/ca-bundle.crt >>> v = Verifier('/etc/dummy.crt') >>> v.verify('pippo') Traceback (most recent call last): File "/usr/lib/python2.3/doctest.py", line 442, in _run_examples_inner compileflags, 1) in globs File "<string>", line 1, in ? File "verifier.py", line 46, in verify self._setup() File "verifier.py", line 36, in _setup raise VerifierError, "cannot access %s" % self._certstore VerifierError: cannot access /etc/dummy.crt >>> """ self._certstore = certstore self._smime = None def _setup(self): """ sets up the SMIME.SMIME instance and loads the CA certificates store """ smime = SMIME.SMIME() st = X509.X509_Store() if not os.access(self._certstore, os.R_OK): raise VerifierError, "cannot access %s" % self._certstore st.load_info(self._certstore) smime.set_x509_store(st) self._smime = smime def verify(self, text): """ verifies a signed SMIME email returns a list of certificates used to sign the SMIME message on success text - string containing the SMIME signed message >>> v = Verifier('/etc/apache/ssl.crt/ca-bundle.crt') >>> v.verify('pippo') Traceback (most recent call last): File "<stdin>", line 1, in ? File "signer.py", line 23, in __init__ raise VerifierError, e VerifierError: cannot extract payloads from message >>> >>> certs = v.verify(test_email) >>> isinstance(certs, list) and len(certs) > 0 True >>> """ if self._smime is None: self._setup() buf = BIO.MemoryBuffer(text) try: p7, data_bio = SMIME.smime_load_pkcs7_bio(buf) except SystemError: # uncaught exception in M2Crypto raise VerifierError, "cannot extract payloads from message" if data_bio is not None: data = data_bio.read() data_bio = BIO.MemoryBuffer(data) sk3 = p7.get0_signers(X509.X509_Stack()) if len(sk3) == 0: raise VerifierError, "no certificates found in message" signer_certs = [] for cert in sk3: signer_certs.append( "-----BEGIN CERTIFICATE-----\n%s-----END CERTIFICATE-----" \ % base64.encodestring(sk3[0].as_der())) self._smime.set_x509_stack(sk3) try: if data_bio is not None: v = self._smime.verify(p7, data_bio) else: v = self._smime.verify(p7) except SMIME.SMIME_Error, e: raise VerifierError, "message verification failed: %s" % e if data_bio is not None and data != v: raise VerifierError, \ "message verification failed: payload vs SMIME.verify output diff\n%s" % \ '\n'.join(list(unified_diff(data.split('\n'), v.split('\n'), n = 1))) return signer_certs test_email = """put your test SMIME signed email here""" def _test(): import doctest return doctest.testmod() if __name__ == "__main__": _test()
I haven't posted anything in a long while since I've been pretty busy working on a complex project in my spare time (let's call it project X), and I've not had Internet access outside office hours for the past couple of months. I hope to start writing again frequently soon, since I'm learning interesting things for project X which involves SMIME and SMTP mail (using OpenSSL, M2Crypto and the email package).
This brief note, done mainly so that I can use it as a reference in the future, regards customizing logging.LogRecord. One of the requirements of project X is that all operations for a single transaction are logged with a unique timestamp, representing the start of the transaction. Since I'm lazy, and I don't trust myself too much when writing code (especially at late hours), I did not want to have to carry around this value everywhere, and remember to pass it to every logging call.
So last night I had a look into the logging internals, to see how to extend logging.LogRecord to carry an additional argument representing my unique timestamp. Apart from being very useful, the logging package is very well architected, so it turns out subclassing LogRecord is not too difficult. Let's see one way of doing it, which is not necessarily the best one so please send me any improvements/suggestions.
created every time something is logged. They contain all the information pertinent to the event being logged. [...]
LogRecord has no methods; it's just a repository for information about the logging event. The only reason it's a class rather than a dictionary is to facilitate extension.
LogRecord objects are created by makeRecord, a factory method of the logging.Logger class. To use a custom subclass of LogRecord, you have to subclass logging.Logger and override makeRecord, then set your subclassed Logger class as logging's default which is not too hard.
When you call logging.getLogger(name) to get a new logger instance, the getLogger function calls Logger.manager.getLogger(name). Manager.getLogger (Logger.manager is Manager(Logger.root)) does a bunch of stuff, then if no loggers with the same name have been already created, it returns a new instance of _loggerClass for the given name. logging._loggerClass is set by default as _loggerClass = Logger. So to set your Logger subclass as logging's default, just call:
logging.setLoggerClass(CustomLogger)
Now that we know what to subclass and how to set logging to use it, we still have to decide how to pass the extra argument to LogRecord. To pass it to our CustomLogger class as an additional __init__ argument we would have to subclass logging.Manager and override getLogger, which seems a bit too much work. Another option would be to directly set the extra parameter in every newly obtained CustomLogger instance, but then you would have to remember to set it every time, since logging raises a KeyError exception if you'd try to use the missing parameter as a string format (you could also set it to a default value in CustomLogging.__init__, but then you'd get a default value in your logs which IMHO is even worse). The way I choose to do it is to set the parameter in logging.root so that my extra argument is available to all loggers since they share a reference to the root logger. To make the root logger emit CustomLogRecord instances, I redefined logging.RootLogger.MakeRecord to match CustomLogger.MakeRecord.
This solution obviously won't work if you need to set different extra arguments for different loggers, and my custom classes manage only a single extra argument, which is exactly what I need for this project but may not be enough for other cases.
#!/usr/bin/python import logging from time import strftime, gmtime class CustomLogRecord(logging.LogRecord): """ LogRecord subclass that stores a unique -- transaction -- time as an additional attribute """ def __init__(self, name, level, pathname, lineno, msg, args, exc_info, trans_time): logging.LogRecord.__init__(self, name, level, pathname, lineno, msg, args, exc_info) self.trans_time = trans_time def makeRecord(self, name, level, fn, lno, msg, args, exc_info): return CustomLogRecord(name, level, fn, lno, msg, args, exc_info, self.root.trans_time) class CustomLogger(logging.Logger): """ Logger subclass that uses CustomLogRecord as its LogRecord class """ def __init__(self, name, level=logging.NOTSET): logging.Logger.__init__(self, name, level) makeRecord = makeRecord # setup logging.setLoggerClass(CustomLogger) logging.root.trans_time = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime()) logging.RootLogger.makeRecord = makeRecord if __name__ == '__main__': # get logger logger = logging.getLogger('modified logger') logger.propagate = False hdlr = logging.StreamHandler() hdlr.setFormatter(logging.Formatter( '%(trans_time)s [%(asctime)s] %(levelname)s %(process)d %(message)s')) logger.addHandler(hdlr) logger.setLevel(logging.DEBUG) logging.root.addHandler(hdlr) logger.debug('test') from time import sleep sleep(1) logger.debug('a second test, with (hopefully) the same date as the previous log record') logging.warn('root logging')
The output of the above script should be something like:
Tue, 18 May 2004 12:53:04 +0000 [2004-05-18 14:53:04,898] DEBUG 31268 test Tue, 18 May 2004 12:53:04 +0000 [2004-05-18 14:53:05,898] DEBUG 31268 a second test, with (hopefully) the same date as the previous log record Tue, 18 May 2004 12:53:04 +0000 [2004-05-18 14:53:05,898] WARNING 31268 root logging