#! /usr/bin/env python3 # -*- coding: utf-8 -*- # Reaction Mailcreate # Copyright (C) 2020-2024 Tobias Dussa <tobias-reaction-mailcreate@dussa.de> # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from email.generator import Generator from email.message import EmailMessage from email.mime.text import MIMEText from email.utils import formatdate, make_msgid import argparse import csv import datetime import getpass import hashlib import jinja2 import os import secrets import sys if not sys.version_info >= (3, 11): sys.exit('ERROR: This script requires Python 3.11 or better.') parser = argparse.ArgumentParser() class KeyValueAction(argparse.Action): # Constructor calling def __call__( self , parser, namespace, values, option_string = None): for value in values: # split it into key and value key, value = value.split('=') # assign into dictionary getattr(namespace, self.dest)[key] = value parser.register('action', 'keyvalue', KeyValueAction) # Parse args parser.add_argument('-a', '--attach', dest='attach', default=[], nargs=1, action='extend', help='add attachment(s) to mail (default: None)') parser.add_argument('-b', '--basedir', dest='basedir', default='Mails', help='base directory for all output (default: "Mails")') parser.add_argument('-B', '--bcc', dest='bcc', default=[], nargs=1, action='extend', help='additional mail recipient(s) to bcc (default: None)') parser.add_argument('-c', '--campaign', dest='campaign', default='Test', help='campaign name (default: "Test")') parser.add_argument('-C', '--cc', dest='cc', default=[], nargs=1, action='extend', help='additional mail recipient(s) to cc (default: None)') parser.add_argument('-d', '--dry-run', dest='dryrun', default=False, action='store_true', help='dry run -- do not actually send mails or create targets (default: False)') parser.add_argument('-f', '--from', dest='sender', default='Nobody <nobody@example.com>', help='sender mail address (default: "Nobody <nobody@example.com>"; implies dry-run if not set)') parser.add_argument('-F', '--force', dest='force', default=False, action='store_true', help='force insecure login without TLS/SSL (default: False)') parser.add_argument('-H', '--hashstring', dest='hashstring', default='{salt}{campaign}{infix}-{site}', help='string to be hashed for the URL (default: "{salt}{campaign}{infix}-{site}" where "{salt}" is a random string)') parser.add_argument('-i', '--input', dest='input', default='{basedir}/{campaign}/Input{infix}.lst', help='input file (default: "{basedir}/{campaign}/Input{infix}.lst")') parser.add_argument('-I', '--message-id', dest='messageid', default=False, action='store_true', help='create a message ID (default: False)') parser.add_argument('-o', '--output', dest='output', default='{basedir}/{campaign}/{site}/{timestamp}{infix}.eml', help='output file name template (default: "{basedir}/{campaign}/{site}/{timestamp}{infix}.eml")') parser.add_argument('-R', '--reply-to', dest='replyto', default=None, help='reply-to mail address (default: None)') parser.add_argument( '--salt', dest='salt', default=None, help='salt to use for hashing (default: random 8-byte hex string)') parser.add_argument( '--sign', dest='sign', default='', type=str.lower, choices=['', 'gpg', 'gpgsm'], help='signature method (default: ""); one of "", "gpg"') parser.add_argument( '--signpass', dest='signpass', default=None, help='Password for the signature key (default: none); will be queried interactively if set to "-"') parser.add_argument( '--sign-arg', dest='signarg', default={}, nargs=1, action='keyvalue', help='additional arguments to be passed to the signature call (default: None)') parser.add_argument( '--sign-init-arg', dest='signinitarg', default={}, nargs=1, action='keyvalue', help='additional arguments to be passed to the signature-mechanism init call (default: None)') parser.add_argument('-s', '--subject', dest='subject', default='Security Challenge for {site} -- {campaign}{infix}', help='mail subject (default: "Security Challenge Message -- {campaign}{infix}")') parser.add_argument('-S', '--smtpserver', dest='smtpserver', default='localhost', help='SMTP server to use (default: "localhost"); port can be specified with "<host>:<port>" notation and takes precedence over implied ports and port specification') parser.add_argument( '--smtpport', dest='smtpport', default=0, type=int, help='SMTP port to use (default: 25); takes precedence over implied ports') parser.add_argument( '--smtpuser', dest='smtpuser', default=None, help='SMTP user to login with (default: none); implies TLS (port 465) unless STARTTLS is set as well') parser.add_argument( '--smtppass', dest='smtppass', default=None, help='SMTP password to login with (default: none); implies TLS (port 465) unless STARTTLS is set as well; will be queried interactively if set to "-"') parser.add_argument( '--starttls', dest='starttls', default=False, action='store_true', help='login using STARTTLS (default: False); implies port 587') parser.add_argument('-t', '--template', dest='template', default='{basedir}/{campaign}/Mail.template', help='mail template file (default: "{basedir}/{campaign}/Mail.template")') parser.add_argument( '--timestamp', dest='timestamp', default='%Y-%m-%dT%H:%M:%SZ', help='timestamp format used for {timestamp} keyword (default: "%%Y-%%m-%%dT%%H:%%M:%%SZ")') parser.add_argument('-T', '--to', dest='to', default='{firstname} {lastname} <{email}>', help='recipient mail address (default: "{firstname} {lastname} <{email}>")') parser.add_argument('-u', '--url', dest='url', default='{webserver}/{campaign}{infix}-{hash}', help='URL template to use (default: "{webserver}/{campaign}{infix}-{hash}"') parser.add_argument('-U', '--createurl', dest='createurl', default='{webserver}/{campaign}{infix}-{hash}/create', help='URL template to use for creation URL (default: "{webserver}/{campaign}{infix}-{hash}/create"') parser.add_argument('-v', '--verbose', dest='verbose', default=False, action='store_true', help='increase verbosity') parser.add_argument('-w', '--webserver', dest='webserver', default='https://challenge.example.com', help='web server to use (default: "https://challenge.example.com"; implies dry-run if not set; to suppress web-hook calling, set to empty string ""') parser.add_argument('infix', default='', nargs='?', help='infix for ID purposes, default empty') args = parser.parse_args() class SafeDict(dict): def __missing__(self, key): return '{' + key + '}' if (args.sender == 'Nobody <nobody@example.com>') or \ (args.webserver == 'https://challenge.example.com'): args.dryrun = True # Import further necessary dependencies # Import dependencies for actual SMTP interaction if necessary if not args.dryrun: import smtplib # Import dependencies for HTTTP interaction if necessary if args.webserver: import requests # Import dependencies for attachment file-magic if necessary if args.attach: import magic # Import dependencies for mail signing if necessary if args.sign: import magic match args.sign: case 'gpg' | 'gpgsm': # Import dependencies for GPG-based mail signing if necessary from base64 import b64encode from email.mime.multipart import MIMEMultipart import gnupg case 'openssl': # Import dependencies for OpenSSL-based mail signing if necessary from M2Crypto import BIO, Rand, SMIME if (len(args.smtpserver.split(':')) == 2) or \ (']:' in args.smtpserver): args.smtpport = args.smtpserver.rsplit(':', 1)[1] args.smtpserver = args.smtpserver.rsplit(':', 1)[0] if (args.smtpport == 0): if args.starttls: args.smtpport = 587 elif args.smtpuser or args.smtppass: args.smtpport = 465 else: args.smtpport = 25 if args.smtpuser and not args.smtppass: args.smtppass = '' if args.smtppass and not args.smtpuser: args.smtpuser = '' if args.smtppass == '-': args.smtppass = getpass.getpass('SMTP authentication password: ') if args.signpass == '-': args.signpass = getpass.getpass('Signature key password: ') if args.signpass: args.signarg['passphrase'] = args.signpass if args.sign and 'keyid' not in args.signarg: args.signarg['keyid'] = args.sender if args.dryrun: print('DRY RUN. Not actually sending e-mails or creating targets, just creating mail files and URLs.') if args.force: print('FORCE INSECURE LOGIN. Will send authentication data even if no secure connection can be established.') if args.starttls: print('STARTTLS enabled.') elif args.verbose: print('STARTTLS disabled.') # Generate salt to use for secure hashing if unset if not args.salt: args.salt = secrets.token_hex(8) if args.verbose: if args.attach: print(f'Using "{", ".join(args.attach)}" as mail attachment(s).') print(f'Using "{args.basedir}" as base directory.') if args.bcc: print(f'Using "{", ".join(args.bcc)}" as bcc mail recipient(s).') print(f'Using "{args.campaign}" as campaign.') if args.cc: print(f'Using "{", ".join(args.cc)}" as cc mail recipient(s).') print(f'Using "{args.sender}" as sender mail address.') if args.replyto: print(f'Using "{args.replyto}" as reply-to mail address.') print(f'Using "{args.hashstring.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver))}" as hash string.') print(f'Using "{args.input.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt))}" as input file.') print(f'Using "{args.output.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt))}" as output file name template.') print(f'Using "{args.salt.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt))}" as salt.') if args.sign: print(f'Using "{args.sign}" as signature method.') if args.signarg: print(f'Using "{str(args.signarg)}" as signing argument(s).') if args.signpass: print('Using "{args.signpass}" as password for signature key.') else: print(f'Using no signature method.') print(f'Using "{args.subject.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt))}" as mail subject.') print(f'Using "{args.template.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt))}" as template file.') print(f'Using "{args.timestamp.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt))}" as timestamp format.') print(f'Using "{args.to.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt))}" as recipient mail address.') print(f'Using "{args.url.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt))}" as URL template.') print(f'Using "{args.createurl.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt))}" as URL template.') print(f'Using "{args.webserver}" as web server.') print(f'Using "{args.infix}" as infix.') print(f'Using "{args.smtpserver}" as SMTP server.') print(f'Using {str(args.smtpport)} as SMTP port.') if args.smtpuser: print(f'Using "{args.smtpuser}" as user for SMTP authentication.') if args.smtppass: print('Using "{args.smtppass}" as password for SMTP authentication.') if args.messageid: print(f'Generating a message ID.') if (args.smtpuser or args.smtppass) and \ (args.smtpport == 25) and \ not args.starttls and \ not args.force: print('SMTP authentication specified, but port 25 used (without STARTTLS). Bailing out.') sys.exit(-1) elif args.force: print('WARNING: SMTP authentication specified, but port 25 used (without STARTTLS)!') data = dict() template = jinja2.Template(open(args.template.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt))).read()) # Set up GPG context if necessary gpg = None if args.sign == 'gpg': gpg = gnupg.GPG(**args.signinitarg) elif args.sign == 'gpgsm': class GPGSM(gnupg.GPG): def __init__(self, gpgbinary='gpgsm', gnupghome=None, verbose=False, use_agent=False, keyring=None, options=None, secret_keyring=None, env=None): self.gpgbinary = gpgbinary self.gnupghome = gnupghome self.env = env if gnupghome and not os.path.isdir(gnupghome): raise ValueError('gnupghome should be a directory (it isn\'t): %s' % gnupghome) if keyring: if isinstance(keyring, string_types): keyring = [keyring] self.keyring = keyring if secret_keyring: # pragma: no cover if isinstance(secret_keyring, string_types): secret_keyring = [secret_keyring] self.secret_keyring = secret_keyring self.verbose = verbose self.use_agent = use_agent if isinstance(options, str): # pragma: no cover options = [options] self.options = options self.on_data = None # or a callable - will be called with data chunks self.encoding = 'latin-1' if gnupghome and not os.path.isdir(self.gnupghome): # pragma: no cover os.makedirs(self.gnupghome, 0o700) self.check_fingerprint_collisions = False def make_args(self, args, passphrase): cmd = ['gpgsm', '--status-fd', '2', '--no-tty', '--no-verbose'] if 'DEBUG_IPC' in os.environ: # pragma: no cover cmd.extend(['--debug', 'ipc']) if passphrase and hasattr(self, 'version'): if self.version >= (2, 1): cmd[1:1] = ['--pinentry-mode', 'loopback'] cmd.extend(['--batch', '--with-colons']) if self.gnupghome: cmd.extend(['--homedir', no_quote(self.gnupghome)]) if self.keyring: cmd.append('--no-default-keyring') for fn in self.keyring: cmd.extend(['--keyring', no_quote(fn)]) if self.secret_keyring: # pragma: no cover for fn in self.secret_keyring: cmd.extend(['--secret-keyring', no_quote(fn)]) if passphrase: cmd.extend(['--passphrase-fd', '0']) if self.use_agent: # pragma: no cover cmd.append('--use-agent') if self.options: cmd.extend(self.options) cmd.extend(args) return cmd gpg = GPGSM(gpgbinary='gpgsm', **args.signinitarg) args.signarg['binary'] = True # Convert int to hex def toHex(serial): tmpString = hex(serial)[2:].upper() if ((len(tmpString) % 2) == 1): tmpString = '0' + tmpString return ':'.join([tmpString[i:i+2] for i in range(0, len(tmpString), 2)]) # Sign message with GPG/GPGSM def signMailGPG(message): # Sign mail try: signature = str(gpg.sign(message.as_string().replace('\n', '\r\n'), detach=True, **args.signarg)) except: print('ERROR signing mail!') if not signature: print('ERROR signing mail!') raise Exception('Empty signature!') # Assemble signature message signatureMessage = EmailMessage() if args.sign == 'gpg': signatureMessage['Content-Type'] = 'application/pgp-signature; name="signature.asc"' signatureMessage['Content-Description'] = 'OpenPGP digital signature' else: signatureMessage['Content-Type'] = 'application/pkcs7-signature; name="smime.p7s"' signatureMessage['Content-Description'] = 'S/MIME digital signature' signatureMessage['Content-Transfer-Encoding'] = 'base64' signature = b64encode(signature.encode('latin1')) signatureMessage.set_payload(signature) # Assemble new message if args.sign == 'gpg': newMessage = MIMEMultipart(_subtype='signed', protocol='application/pgp-signature') else: newMessage = MIMEMultipart(_subtype='signed', protocol='application/pkcs7-signature') newMessage.attach(message) newMessage.attach(signatureMessage) return newMessage # Sign message with OpenSSL def signMailOpenSSL(message): return message # Sign message if requested def signMail(message): if not args.sign: return message match args.sign: case 'gpg' | 'gpgsm': return signMailGPG(message) case 'openssl': return signMailOpenSSL(message) # Assemble and send the actual mail def createMail(data): output = args.output.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt, site=data['site'], firstname=data['firstname'], lastname=data['lastname'], email=data['email'], hash=data['hash'], URL=data['URL'], timestamp=data['timestamp'])) directory = os.path.dirname(output) # Generate email body and attachments message = MIMEText(template.render(data)) message.set_charset('utf-8') for attach in args.attach: with open(attach, 'rb') as attachment: message.add_attachment(attachment.read(), *magic.from_file(attach, mime=True).split('/')) # Sign mail if requested message = signMail(message) # Create message ID if requested if args.messageid: message['Message-Id'] = make_msgid() message['Date'] = formatdate(localtime=True) message['Subject'] = args.subject.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt, site=data['site'], firstname=data['firstname'], lastname=data['lastname'], email=data['email'], hash=data['hash'], URL=data['URL'], timestamp=data['timestamp'])) message['From'] = args.sender.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt, site=data['site'], firstname=data['firstname'], lastname=data['lastname'], email=data['email'], hash=data['hash'], URL=data['URL'], timestamp=data['timestamp'])) if args.replyto: message['Reply-To'] = args.replyto.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt, site=data['site'], firstname=data['firstname'], lastname=data['lastname'], email=data['email'], hash=data['hash'], URL=data['URL'], timestamp=data['timestamp'])) message['To'] = args.to.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt, site=data['site'], firstname=data['firstname'], lastname=data['lastname'], email=data['email'], hash=data['hash'], URL=data['URL'], timestamp=data['timestamp'])) if args.cc: message['Cc'] = ', '.join(args.cc).format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt, site=data['site'], firstname=data['firstname'], lastname=data['lastname'], email=data['email'], hash=data['hash'], URL=data['URL'], timestamp=data['timestamp'])) if args.bcc: message['Bcc'] = ', '.join(args.bcc).format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt, site=data['site'], firstname=data['firstname'], lastname=data['lastname'], email=data['email'], hash=data['hash'], URL=data['URL'], timestamp=data['timestamp'])) # Record mail print(f'Writing mail to {output} using hash {data["hash"]}') if not os.path.exists(directory): os.makedirs(directory) with open(output, 'w') as mailfile: Generator(mailfile).flatten(message) if not args.dryrun: # Actually send the mail print(f'Sending mail to {message["To"]}') if message['Cc']: print(f'... and cc to {message["Cc"]}') if message['Bcc']: print(f'... and bcc to {message["Bcc"]}') try: if args.starttls: with smtplib.SMTP(args.smtpserver, args.smtpport) as smtp: smtp.starttls() smtp.login(args.smtpuser, args.smtppass) smtp.send_message(message) elif args.smtpuser or args.smtppass: with smtplib.SMTP_SSL(args.smtpserver, args.smtpport) as smtp: smtp.login(args.smtpuser, args.smtppass) smtp.send_message(message) else: with smtplib.SMTP(args.smtpserver, args.smtpport) as smtp: smtp.send_message(message) except: print(f'ERROR sending the mail!') # Call the webhook if webserver is specified if args.webserver: print(f'Creating target at {data["CreateURL"]}') requests.get(f'{data["CreateURL"]}') # Generate site-specific data: hash, URL, CreateURL, timestamp def generateIndividualData(data): data['hash'] = hashlib.sha256(args.hashstring.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt, site=data['site'], firstname=data['firstname'], lastname=data['lastname'], email=data['email'])).encode('utf-8')).hexdigest() data['URL'] = args.url.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt, site=data['site'], firstname=data['firstname'], lastname=data['lastname'], email=data['email'], hash=data['hash'])) data['CreateURL'] = args.createurl.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt, site=data['site'], firstname=data['firstname'], lastname=data['lastname'], email=data['email'], hash=data['hash'])) data['timestamp'] = datetime.datetime.now(datetime.UTC).strftime(args.timestamp) # Main loop with open(args.input.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt)), 'r') as inputfile: data = csv.DictReader(inputfile, fieldnames=['site', 'firstname', 'lastname', 'email'], delimiter=',') for row in data: if (row['site'] != 'site') or \ (row['firstname'] != 'firstname') or \ (row['lastname'] != 'lastname') or \ (row['email'] != 'email'): generateIndividualData(row) createMail(row)