Patchwork [3,of,4] tests: add dummy SMTP daemon for SSL tests

login
register
mail settings
Submitter Yuya Nishihara
Date June 8, 2016, 2:23 p.m.
Message ID <85f6121625534279562b.1465395781@mimosa>
Download mbox | patch
Permalink /patch/15437/
State Accepted
Headers show

Comments

Yuya Nishihara - June 8, 2016, 2:23 p.m.
# HG changeset patch
# User Yuya Nishihara <yuya@tcha.org>
# Date 1464356627 -32400
#      Fri May 27 22:43:47 2016 +0900
# Node ID 85f6121625534279562bee84a97886c54af5a7d5
# Parent  8d73e102498a6f53c76fd62ac6730e6c60007f40
tests: add dummy SMTP daemon for SSL tests

Currently it only supports SMTP over SSL since SMTPS should be simpler than
handling StartTLS.

Since we don't need asynchronous server for our tests, it does TLS handshake
in blocking way. But asyncore is required by Python smtpd module.

Patch

diff --git a/tests/dummysmtpd.py b/tests/dummysmtpd.py
new file mode 100755
--- /dev/null
+++ b/tests/dummysmtpd.py
@@ -0,0 +1,81 @@ 
+#!/usr/bin/env python
+
+"""dummy SMTP server for use in tests"""
+
+from __future__ import absolute_import
+
+import asyncore
+import optparse
+import smtpd
+import ssl
+import sys
+
+from mercurial import (
+    cmdutil,
+)
+
+def log(msg):
+    sys.stdout.write(msg)
+    sys.stdout.flush()
+
+class dummysmtpserver(smtpd.SMTPServer):
+    def __init__(self, localaddr):
+        smtpd.SMTPServer.__init__(self, localaddr, remoteaddr=None)
+
+    def process_message(self, peer, mailfrom, rcpttos, data):
+        log('%s from=%s to=%s\n' % (peer[0], mailfrom, ', '.join(rcpttos)))
+
+class dummysmtpsecureserver(dummysmtpserver):
+    def __init__(self, localaddr, certfile):
+        dummysmtpserver.__init__(self, localaddr)
+        self._certfile = certfile
+
+    def handle_accept(self):
+        pair = self.accept()
+        if not pair:
+            return
+        conn, addr = pair
+        try:
+            # wrap_socket() would block, but we don't care
+            conn = ssl.wrap_socket(conn, server_side=True,
+                                   certfile=self._certfile,
+                                   ssl_version=ssl.PROTOCOL_TLSv1)
+        except ssl.SSLError:
+            log('%s ssl error\n' % addr[0])
+            conn.close()
+            return
+        smtpd.SMTPChannel(self, conn, addr)
+
+def run():
+    try:
+        asyncore.loop()
+    except KeyboardInterrupt:
+        pass
+
+def main():
+    op = optparse.OptionParser()
+    op.add_option('-d', '--daemon', action='store_true')
+    op.add_option('--daemon-postexec', action='append')
+    op.add_option('-p', '--port', type=int, default=8025)
+    op.add_option('-a', '--address', default='localhost')
+    op.add_option('--pid-file', metavar='FILE')
+    op.add_option('--tls', choices=['none', 'smtps'], default='none')
+    op.add_option('--certificate', metavar='FILE')
+
+    opts, args = op.parse_args()
+    if opts.tls == 'smtps' and not opts.certificate:
+        op.error('--certificate must be specified')
+
+    addr = (opts.address, opts.port)
+    def init():
+        if opts.tls == 'none':
+            dummysmtpserver(addr)
+        else:
+            dummysmtpsecureserver(addr, opts.certificate)
+        log('listening at %s:%d\n' % addr)
+
+    cmdutil.service(vars(opts), initfn=init, runfn=run,
+                    runargs=[sys.executable, __file__] + sys.argv[1:])
+
+if __name__ == '__main__':
+    main()