-
Tobias Dussa authoredTobias Dussa authored
createMails.py 23.66 KiB
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# Reaction Mailcreate
# Copyright (C) 2020-2024 Tobias Dussa <tobias-reaction-mailcreate@dussa.de>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from email.generator import Generator
from email.message import EmailMessage
from email.mime.text import MIMEText
from email.utils import formatdate, make_msgid
import argparse
import csv
import datetime
import getpass
import hashlib
import jinja2
import os
import secrets
import sys
if not sys.version_info >= (3, 11):
sys.exit('ERROR: This script requires Python 3.11 or better.')
parser = argparse.ArgumentParser()
class KeyValueAction(argparse.Action):
# Constructor calling
def __call__( self , parser, namespace,
values, option_string = None):
for value in values:
# split it into key and value
key, value = value.split('=')
# assign into dictionary
getattr(namespace, self.dest)[key] = value
parser.register('action', 'keyvalue', KeyValueAction)
# Parse args
parser.add_argument('-a', '--attach', dest='attach', default=[], nargs=1, action='extend', help='add attachment(s) to mail (default: None)')
parser.add_argument('-b', '--basedir', dest='basedir', default='Mails', help='base directory for all output (default: "Mails")')
parser.add_argument('-B', '--bcc', dest='bcc', default=[], nargs=1, action='extend', help='additional mail recipient(s) to bcc (default: None)')
parser.add_argument('-c', '--campaign', dest='campaign', default='Test', help='campaign name (default: "Test")')
parser.add_argument('-C', '--cc', dest='cc', default=[], nargs=1, action='extend', help='additional mail recipient(s) to cc (default: None)')
parser.add_argument('-d', '--dry-run', dest='dryrun', default=False, action='store_true', help='dry run -- do not actually send mails or create targets (default: False)')
parser.add_argument('-f', '--from', dest='sender', default='Nobody <nobody@example.com>', help='sender mail address (default: "Nobody <nobody@example.com>"; implies dry-run if not set)')
parser.add_argument('-F', '--force', dest='force', default=False, action='store_true', help='force insecure login without TLS/SSL (default: False)')
parser.add_argument('-H', '--hashstring', dest='hashstring', default='{salt}{campaign}{infix}-{site}', help='string to be hashed for the URL (default: "{salt}{campaign}{infix}-{site}" where "{salt}" is a random string)')
parser.add_argument('-i', '--input', dest='input', default='{basedir}/{campaign}/Input{infix}.lst', help='input file (default: "{basedir}/{campaign}/Input{infix}.lst")')
parser.add_argument('-I', '--message-id', dest='messageid', default=False, action='store_true', help='create a message ID (default: False)')
parser.add_argument('-o', '--output', dest='output', default='{basedir}/{campaign}/{site}/{timestamp}{infix}.eml', help='output file name template (default: "{basedir}/{campaign}/{site}/{timestamp}{infix}.eml")')
parser.add_argument('-R', '--reply-to', dest='replyto', default=None, help='reply-to mail address (default: None)')
parser.add_argument( '--salt', dest='salt', default=None, help='salt to use for hashing (default: random 8-byte hex string)')
parser.add_argument( '--sign', dest='sign', default='', type=str.lower, choices=['', 'gpg', 'gpgsm'], help='signature method (default: ""); one of "", "gpg"')
parser.add_argument( '--signpass', dest='signpass', default=None, help='Password for the signature key (default: none); will be queried interactively if set to "-"')
parser.add_argument( '--sign-arg', dest='signarg', default={}, nargs=1, action='keyvalue', help='additional arguments to be passed to the signature call (default: None)')
parser.add_argument( '--sign-init-arg', dest='signinitarg', default={}, nargs=1, action='keyvalue', help='additional arguments to be passed to the signature-mechanism init call (default: None)')
parser.add_argument('-s', '--subject', dest='subject', default='Security Challenge for {site} -- {campaign}{infix}', help='mail subject (default: "Security Challenge Message -- {campaign}{infix}")')
parser.add_argument('-S', '--smtpserver', dest='smtpserver', default='localhost', help='SMTP server to use (default: "localhost"); port can be specified with "<host>:<port>" notation and takes precedence over implied ports and port specification')
parser.add_argument( '--smtpport', dest='smtpport', default=0, type=int, help='SMTP port to use (default: 25); takes precedence over implied ports')
parser.add_argument( '--smtpuser', dest='smtpuser', default=None, help='SMTP user to login with (default: none); implies TLS (port 465) unless STARTTLS is set as well')
parser.add_argument( '--smtppass', dest='smtppass', default=None, help='SMTP password to login with (default: none); implies TLS (port 465) unless STARTTLS is set as well; will be queried interactively if set to "-"')
parser.add_argument( '--starttls', dest='starttls', default=False, action='store_true', help='login using STARTTLS (default: False); implies port 587')
parser.add_argument('-t', '--template', dest='template', default='{basedir}/{campaign}/Mail.template', help='mail template file (default: "{basedir}/{campaign}/Mail.template")')
parser.add_argument( '--timestamp', dest='timestamp', default='%Y-%m-%dT%H:%M:%SZ', help='timestamp format used for {timestamp} keyword (default: "%%Y-%%m-%%dT%%H:%%M:%%SZ")')
parser.add_argument('-T', '--to', dest='to', default='{firstname} {lastname} <{email}>', help='recipient mail address (default: "{firstname} {lastname} <{email}>")')
parser.add_argument('-u', '--url', dest='url', default='{webserver}/{campaign}{infix}-{hash}', help='URL template to use (default: "{webserver}/{campaign}{infix}-{hash}"')
parser.add_argument('-U', '--createurl', dest='createurl', default='{webserver}/{campaign}{infix}-{hash}/create', help='URL template to use for creation URL (default: "{webserver}/{campaign}{infix}-{hash}/create"')
parser.add_argument('-v', '--verbose', dest='verbose', default=False, action='store_true', help='increase verbosity')
parser.add_argument('-w', '--webserver', dest='webserver', default='https://challenge.example.com', help='web server to use (default: "https://challenge.example.com"; implies dry-run if not set; to suppress web-hook calling, set to empty string ""')
parser.add_argument('infix', default='', nargs='?', help='infix for ID purposes, default empty')
args = parser.parse_args()
class SafeDict(dict):
def __missing__(self, key):
return '{' + key + '}'
if (args.sender == 'Nobody <nobody@example.com>') or \
(args.webserver == 'https://challenge.example.com'):
args.dryrun = True
# Import further necessary dependencies
# Import dependencies for actual SMTP interaction if necessary
if not args.dryrun:
import smtplib
# Import dependencies for HTTTP interaction if necessary
if args.webserver:
import requests
# Import dependencies for attachment file-magic if necessary
if args.attach:
import magic
# Import dependencies for mail signing if necessary
if args.sign:
import magic
match args.sign:
case 'gpg' | 'gpgsm':
# Import dependencies for GPG-based mail signing if necessary
from base64 import b64encode
from email.mime.multipart import MIMEMultipart
import gnupg
case 'openssl':
# Import dependencies for OpenSSL-based mail signing if necessary
from M2Crypto import BIO, Rand, SMIME
if (len(args.smtpserver.split(':')) == 2) or \
(']:' in args.smtpserver):
args.smtpport = args.smtpserver.rsplit(':', 1)[1]
args.smtpserver = args.smtpserver.rsplit(':', 1)[0]
if (args.smtpport == 0):
if args.starttls:
args.smtpport = 587
elif args.smtpuser or args.smtppass:
args.smtpport = 465
else:
args.smtpport = 25
if args.smtpuser and not args.smtppass:
args.smtppass = ''
if args.smtppass and not args.smtpuser:
args.smtpuser = ''
if args.smtppass == '-':
args.smtppass = getpass.getpass('SMTP authentication password: ')
if args.signpass == '-':
args.signpass = getpass.getpass('Signature key password: ')
if args.signpass:
args.signarg['passphrase'] = args.signpass
if args.sign and 'keyid' not in args.signarg:
args.signarg['keyid'] = args.sender
if args.dryrun:
print('DRY RUN. Not actually sending e-mails or creating targets, just creating mail files and URLs.')
if args.force:
print('FORCE INSECURE LOGIN. Will send authentication data even if no secure connection can be established.')
if args.starttls:
print('STARTTLS enabled.')
elif args.verbose:
print('STARTTLS disabled.')
# Generate salt to use for secure hashing if unset
if not args.salt:
args.salt = secrets.token_hex(8)
if args.verbose:
if args.attach:
print(f'Using "{", ".join(args.attach)}" as mail attachment(s).')
print(f'Using "{args.basedir}" as base directory.')
if args.bcc:
print(f'Using "{", ".join(args.bcc)}" as bcc mail recipient(s).')
print(f'Using "{args.campaign}" as campaign.')
if args.cc:
print(f'Using "{", ".join(args.cc)}" as cc mail recipient(s).')
print(f'Using "{args.sender}" as sender mail address.')
if args.replyto:
print(f'Using "{args.replyto}" as reply-to mail address.')
print(f'Using "{args.hashstring.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver))}" as hash string.')
print(f'Using "{args.input.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt))}" as input file.')
print(f'Using "{args.output.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt))}" as output file name template.')
print(f'Using "{args.salt.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt))}" as salt.')
if args.sign:
print(f'Using "{args.sign}" as signature method.')
if args.signarg:
print(f'Using "{str(args.signarg)}" as signing argument(s).')
if args.signpass:
print('Using "{args.signpass}" as password for signature key.')
else:
print(f'Using no signature method.')
print(f'Using "{args.subject.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt))}" as mail subject.')
print(f'Using "{args.template.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt))}" as template file.')
print(f'Using "{args.timestamp.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt))}" as timestamp format.')
print(f'Using "{args.to.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt))}" as recipient mail address.')
print(f'Using "{args.url.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt))}" as URL template.')
print(f'Using "{args.createurl.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt))}" as URL template.')
print(f'Using "{args.webserver}" as web server.')
print(f'Using "{args.infix}" as infix.')
print(f'Using "{args.smtpserver}" as SMTP server.')
print(f'Using {str(args.smtpport)} as SMTP port.')
if args.smtpuser:
print(f'Using "{args.smtpuser}" as user for SMTP authentication.')
if args.smtppass:
print('Using "{args.smtppass}" as password for SMTP authentication.')
if args.messageid:
print(f'Generating a message ID.')
if (args.smtpuser or args.smtppass) and \
(args.smtpport == 25) and \
not args.starttls and \
not args.force:
print('SMTP authentication specified, but port 25 used (without STARTTLS). Bailing out.')
sys.exit(-1)
elif args.force:
print('WARNING: SMTP authentication specified, but port 25 used (without STARTTLS)!')
data = dict()
template = jinja2.Template(open(args.template.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt))).read())
# Set up GPG context if necessary
gpg = None
if args.sign == 'gpg':
gpg = gnupg.GPG(**args.signinitarg)
elif args.sign == 'gpgsm':
class GPGSM(gnupg.GPG):
def __init__(self,
gpgbinary='gpgsm',
gnupghome=None,
verbose=False,
use_agent=False,
keyring=None,
options=None,
secret_keyring=None,
env=None):
self.gpgbinary = gpgbinary
self.gnupghome = gnupghome
self.env = env
if gnupghome and not os.path.isdir(gnupghome):
raise ValueError('gnupghome should be a directory (it isn\'t): %s' % gnupghome)
if keyring:
if isinstance(keyring, string_types):
keyring = [keyring]
self.keyring = keyring
if secret_keyring: # pragma: no cover
if isinstance(secret_keyring, string_types):
secret_keyring = [secret_keyring]
self.secret_keyring = secret_keyring
self.verbose = verbose
self.use_agent = use_agent
if isinstance(options, str): # pragma: no cover
options = [options]
self.options = options
self.on_data = None # or a callable - will be called with data chunks
self.encoding = 'latin-1'
if gnupghome and not os.path.isdir(self.gnupghome): # pragma: no cover
os.makedirs(self.gnupghome, 0o700)
self.check_fingerprint_collisions = False
def make_args(self, args, passphrase):
cmd = ['gpgsm', '--status-fd', '2', '--no-tty', '--no-verbose']
if 'DEBUG_IPC' in os.environ: # pragma: no cover
cmd.extend(['--debug', 'ipc'])
if passphrase and hasattr(self, 'version'):
if self.version >= (2, 1):
cmd[1:1] = ['--pinentry-mode', 'loopback']
cmd.extend(['--batch', '--with-colons'])
if self.gnupghome:
cmd.extend(['--homedir', no_quote(self.gnupghome)])
if self.keyring:
cmd.append('--no-default-keyring')
for fn in self.keyring:
cmd.extend(['--keyring', no_quote(fn)])
if self.secret_keyring: # pragma: no cover
for fn in self.secret_keyring:
cmd.extend(['--secret-keyring', no_quote(fn)])
if passphrase:
cmd.extend(['--passphrase-fd', '0'])
if self.use_agent: # pragma: no cover
cmd.append('--use-agent')
if self.options:
cmd.extend(self.options)
cmd.extend(args)
return cmd
gpg = GPGSM(gpgbinary='gpgsm', **args.signinitarg)
args.signarg['binary'] = True
# Convert int to hex
def toHex(serial):
tmpString = hex(serial)[2:].upper()
if ((len(tmpString) % 2) == 1):
tmpString = '0' + tmpString
return ':'.join([tmpString[i:i+2] for i in range(0, len(tmpString), 2)])
# Sign message with GPG/GPGSM
def signMailGPG(message):
# Sign mail
try:
signature = str(gpg.sign(message.as_string().replace('\n', '\r\n'), detach=True, **args.signarg))
except:
print('ERROR signing mail!')
if not signature:
print('ERROR signing mail!')
raise Exception('Empty signature!')
# Assemble signature message
signatureMessage = EmailMessage()
if args.sign == 'gpg':
signatureMessage['Content-Type'] = 'application/pgp-signature; name="signature.asc"'
signatureMessage['Content-Description'] = 'OpenPGP digital signature'
else:
signatureMessage['Content-Type'] = 'application/pkcs7-signature; name="smime.p7s"'
signatureMessage['Content-Description'] = 'S/MIME digital signature'
signatureMessage['Content-Transfer-Encoding'] = 'base64'
signature = b64encode(signature.encode('latin1'))
signatureMessage.set_payload(signature)
# Assemble new message
if args.sign == 'gpg':
newMessage = MIMEMultipart(_subtype='signed', protocol='application/pgp-signature')
else:
newMessage = MIMEMultipart(_subtype='signed', protocol='application/pkcs7-signature')
newMessage.attach(message)
newMessage.attach(signatureMessage)
return newMessage
# Sign message with OpenSSL
def signMailOpenSSL(message):
return message
# Sign message if requested
def signMail(message):
if not args.sign:
return message
match args.sign:
case 'gpg' | 'gpgsm':
return signMailGPG(message)
case 'openssl':
return signMailOpenSSL(message)
# Assemble and send the actual mail
def createMail(data):
output = args.output.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt, site=data['site'], firstname=data['firstname'], lastname=data['lastname'], email=data['email'], hash=data['hash'], URL=data['URL'], timestamp=data['timestamp']))
directory = os.path.dirname(output)
# Generate email body and attachments
message = MIMEText(template.render(data))
message.set_charset('utf-8')
for attach in args.attach:
with open(attach, 'rb') as attachment:
message.add_attachment(attachment.read(), *magic.from_file(attach, mime=True).split('/'))
# Sign mail if requested
message = signMail(message)
# Create message ID if requested
if args.messageid:
message['Message-Id'] = make_msgid()
message['Date'] = formatdate(localtime=True)
message['Subject'] = args.subject.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt, site=data['site'], firstname=data['firstname'], lastname=data['lastname'], email=data['email'], hash=data['hash'], URL=data['URL'], timestamp=data['timestamp']))
message['From'] = args.sender.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt, site=data['site'], firstname=data['firstname'], lastname=data['lastname'], email=data['email'], hash=data['hash'], URL=data['URL'], timestamp=data['timestamp']))
if args.replyto:
message['Reply-To'] = args.replyto.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt, site=data['site'], firstname=data['firstname'], lastname=data['lastname'], email=data['email'], hash=data['hash'], URL=data['URL'], timestamp=data['timestamp']))
message['To'] = args.to.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt, site=data['site'], firstname=data['firstname'], lastname=data['lastname'], email=data['email'], hash=data['hash'], URL=data['URL'], timestamp=data['timestamp']))
if args.cc:
message['Cc'] = ', '.join(args.cc).format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt, site=data['site'], firstname=data['firstname'], lastname=data['lastname'], email=data['email'], hash=data['hash'], URL=data['URL'], timestamp=data['timestamp']))
if args.bcc:
message['Bcc'] = ', '.join(args.bcc).format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt, site=data['site'], firstname=data['firstname'], lastname=data['lastname'], email=data['email'], hash=data['hash'], URL=data['URL'], timestamp=data['timestamp']))
# Record mail
print(f'Writing mail to {output} using hash {data["hash"]}')
if not os.path.exists(directory):
os.makedirs(directory)
with open(output, 'w') as mailfile:
Generator(mailfile).flatten(message)
if not args.dryrun:
# Actually send the mail
print(f'Sending mail to {message["To"]}')
if message['Cc']:
print(f'... and cc to {message["Cc"]}')
if message['Bcc']:
print(f'... and bcc to {message["Bcc"]}')
try:
if args.starttls:
with smtplib.SMTP(args.smtpserver, args.smtpport) as smtp:
smtp.starttls()
smtp.login(args.smtpuser, args.smtppass)
smtp.send_message(message)
elif args.smtpuser or args.smtppass:
with smtplib.SMTP_SSL(args.smtpserver, args.smtpport) as smtp:
smtp.login(args.smtpuser, args.smtppass)
smtp.send_message(message)
else:
with smtplib.SMTP(args.smtpserver, args.smtpport) as smtp:
smtp.send_message(message)
except:
print(f'ERROR sending the mail!')
# Call the webhook if webserver is specified
if args.webserver:
print(f'Creating target at {data["CreateURL"]}')
requests.get(f'{data["CreateURL"]}')
# Generate site-specific data: hash, URL, CreateURL, timestamp
def generateIndividualData(data):
data['hash'] = hashlib.sha256(args.hashstring.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt, site=data['site'], firstname=data['firstname'], lastname=data['lastname'], email=data['email'])).encode('utf-8')).hexdigest()
data['URL'] = args.url.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt, site=data['site'], firstname=data['firstname'], lastname=data['lastname'], email=data['email'], hash=data['hash']))
data['CreateURL'] = args.createurl.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt, site=data['site'], firstname=data['firstname'], lastname=data['lastname'], email=data['email'], hash=data['hash']))
data['timestamp'] = datetime.datetime.now(datetime.UTC).strftime(args.timestamp)
# Main loop
with open(args.input.format_map(SafeDict(basedir=args.basedir, campaign=args.campaign, infix=args.infix, webserver=args.webserver, salt=args.salt)), 'r') as inputfile:
data = csv.DictReader(inputfile, fieldnames=['site', 'firstname', 'lastname', 'email'], delimiter=',')
for row in data:
if (row['site'] != 'site') or \
(row['firstname'] != 'firstname') or \
(row['lastname'] != 'lastname') or \
(row['email'] != 'email'):
generateIndividualData(row)
createMail(row)