Skip to content
Snippets Groups Projects
Commit f2914e4c authored by Michal Hažlinský's avatar Michal Hažlinský :family:
Browse files

Merge remote-tracking branch 'surfnet/master' into stable

parents 3e87ac23 e08721d3
No related branches found
No related tags found
No related merge requests found
...@@ -3,3 +3,4 @@ _trial_temp ...@@ -3,3 +3,4 @@ _trial_temp
build build
dist dist
.opennsa-test.json .opennsa-test.json
.python-version
...@@ -31,7 +31,7 @@ NORDUnet License (3-clause BSD). See LICENSE for more details. ...@@ -31,7 +31,7 @@ NORDUnet License (3-clause BSD). See LICENSE for more details.
#### Contact #### Contact
* Henrik Thostrup Jensen htj <at> nordu.net * Johannes Garm Houen - jgh @ nordu.net
#### Copyright #### Copyright
......
...@@ -130,11 +130,11 @@ def doMain(): ...@@ -130,11 +130,11 @@ def doMain():
if public_key or private_key or certificate_dir: if public_key or private_key or certificate_dir:
if public_key == '.' and private_key == '.': if public_key == '.' and private_key == '.':
from opennsa import ctxfactory from opennsa.opennsaTlsContext import opennsaTlsContext
ctx_factory = ctxfactory.RequestContextFactory(certificate_dir, verify_cert) ctx_factory = opennsaTlsContext(certificate_dir, verify_cert)
elif public_key and private_key and certificate_dir: elif public_key and private_key and certificate_dir:
from opennsa import ctxfactory from opennsa.opennsaTlsContext import opennsa2WayTlsContext
ctx_factory = ctxfactory.ContextFactory(private_key, public_key, certificate_dir, verify_cert) ctx_factory = opennsa2WayTlsContext(private_key, public_key, certificate_dir, verify_cert)
elif tls: elif tls:
if not public_key: if not public_key:
raise usage.UsageError('Cannot setup TLS. No public key defined') raise usage.UsageError('Cannot setup TLS. No public key defined')
......
"""
SSL/TLS context definition.
Most of this code is borrowed from the SGAS 3.X LUTS codebase.
NORDUnet holds the copyright for SGAS 3.X LUTS and OpenNSA.
"""
import os
from OpenSSL import SSL
from twisted.python import log
LOG_SYSTEM = 'CTXFactory'
class RequestContextFactory:
"""
Context Factory for issuing requests to SSL/TLS services without having
a client certificate.
"""
def __init__(self, certificate_dir, verify):
self.certificate_dir = certificate_dir
self.verify = verify
self.ctx = None
def getContext(self):
if self.ctx is not None:
return self.ctx
else:
self.ctx = self._createContext()
return self.ctx
def _createContext(self):
def verify_callback(conn, x509, error_number, error_depth, allowed):
# just return what openssl thinks is right
if self.verify:
return allowed # return what openssl thinks is right
else:
return 1 # allow everything which has a cert
# The way to support tls 1.0 and forward is to use the SSLv23 method
# (which means everything) and then disable ssl2 and ssl3
# Not pretty, but it works
ctx = SSL.Context(SSL.SSLv23_METHOD)
ctx.set_options(SSL.OP_NO_SSLv2)
ctx.set_options(SSL.OP_NO_SSLv3)
# disable tls session id, as the twisted tls protocol seems to break on them
ctx.set_session_cache_mode(SSL.SESS_CACHE_OFF)
ctx.set_options(SSL.OP_NO_TICKET)
ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
calist = [ ca for ca in os.listdir(self.certificate_dir) if ca.endswith('.0') ]
if len(calist) == 0 and self.verify:
log.msg('No certificiates loaded for CTX verificiation. CA verification will not work.', system=LOG_SYSTEM)
for ca in calist:
# openssl wants absolute paths
ca = os.path.join(self.certificate_dir, ca)
ctx.load_verify_locations(ca)
return ctx
class ContextFactory(RequestContextFactory):
"""
Full context factory with private key and cert. When running service
over SSL/TLS.
"""
def __init__(self, private_key_path, public_key_path, certificate_dir, verify):
RequestContextFactory.__init__(self, certificate_dir, verify)
self.private_key_path = private_key_path
self.public_key_path = public_key_path
def _createContext(self):
ctx = RequestContextFactory._createContext(self)
ctx.use_privatekey_file(self.private_key_path)
ctx.use_certificate_chain_file(self.public_key_path)
ctx.check_privatekey() # sanity check
return ctx
#!/usr/bin/env python
"""
SSL/TLS context definition.
Most of this code is borrowed from the SGAS 3.X LUTS codebase.
NORDUnet holds the copyright for SGAS 3.X LUTS and OpenNSA.
With contributions from Hans Trompert (SURF BV)
"""
from OpenSSL import crypto, SSL
from os import listdir, path
from sys import stdout
from twisted.internet import ssl
from twisted.python import log
from twisted.python.filepath import FilePath
LOG_SYSTEM = 'opennsaTlsContext'
class opennsaTlsContext:
"""
Context to be used while issuing requests to SSL/TLS services without having
a client certificate.
"""
def __init__(self, certificate_dir, verify):
self.certificate_dir = certificate_dir
self.verify = verify
self._trustRoot = self._createTrustRootFromCADirectory(certificate_dir)
self._extraCertificateOptions = {
'enableSessions': False,
'enableSessionTickets': False,
'raiseMinimumTo': ssl.TLSVersion.TLSv1_2,
'fixBrokenPeers': True
}
def _createTrustRootFromCADirectory(self, certificate_dir):
CACertificates = []
for CAFilename in listdir(certificate_dir):
if not CAFilename.endswith('.0'):
continue
CAFileContent = FilePath(certificate_dir).child(CAFilename).getContent()
try:
CACertificates.append(ssl.Certificate.loadPEM(CAFileContent))
except crypto.Error as error:
log.msg('Cannot load CA certificate from %s: %s' % (CAFilename, error), system = LOG_SYSTEM)
else:
log.msg('Loaded CA certificate commonName %s' % (str(CACertificates[-1].getSubject().commonName)), system = LOG_SYSTEM)
if len(CACertificates) == 0:
print('No certificiates loaded for CTX verificiation. CA verification will not work.')
return ssl.trustRootFromCertificates(CACertificates)
def getTrustRoot(self):
return self._trustRoot
def getExtraCertificateOptions(self):
return self._extraCertificateOptions
def getClientTLSOptions(self, hostname):
if(not self.verify):
log.msg('httpClient ignores verify=false, WILL verify certificate chain for %s against certdir' % (hostname), system = LOG_SYSTEM)
return ssl.optionsForClientTLS(hostname, trustRoot=self._trustRoot, extraCertificateOptions=self._extraCertificateOptions)
def getContext(self):
if self.ctx is None:
self.ctx = self.createOpenSSLContext()
return self.ctx
def createOpenSSLContext(self):
log.msg('creating OpenSSL SSL Context ...', system=LOG_SYSTEM)
def verify_callback(conn, x509, error_number, error_depth, allowed):
# just return what openssl thinks is right
if self.verify:
return allowed # return what openssl thinks is right
else:
return 1 # allow everything which has a cert
# The way to support tls 1.0 and forward is to use the SSLv23 method
# (which means everything) and then disable ssl2 and ssl3
# Not pretty, but it works
ctx = SSL.Context(SSL.SSLv23_METHOD)
ctx.set_options(SSL.OP_NO_SSLv2)
ctx.set_options(SSL.OP_NO_SSLv3)
# disable tls session id, as the twisted tls protocol seems to break on them
ctx.set_session_cache_mode(SSL.SESS_CACHE_OFF)
ctx.set_options(SSL.OP_NO_TICKET)
ctx.set_verify(SSL.VERIFY_PEER, verify_callback)
calist = [ ca for ca in listdir(self.certificate_dir) if ca.endswith('.0') ]
if len(calist) == 0 and self.verify:
log.msg('No certificiates loaded for CTX verificiation. CA verification will not work.', system=LOG_SYSTEM)
for ca in calist:
# openssl wants absolute paths
ca = path.join(self.certificate_dir, ca)
ctx.load_verify_locations(ca)
return ctx
class opennsa2WayTlsContext(opennsaTlsContext):
"""
Full context with private key and certificate when running service
over SSL/TLS.
"""
def __init__(self, private_key_path, public_key_path, certificate_dir, verify):
self.private_key_path = private_key_path
self.public_key_path = public_key_path
self.ctx = None
opennsaTlsContext.__init__(self, certificate_dir, verify)
keyContent = FilePath(private_key_path).getContent()
certificateContent = FilePath(public_key_path).getContent()
self._clientCertificate = ssl.PrivateCertificate.loadPEM(keyContent + certificateContent)
def getClientCertificate(self):
return self._clientCertificate
def getPrivateKey(self):
return self.getClientCertificate().privateKey.original
def getCertificate(self):
return self.getClientCertificate().original
def getClientTLSOptions(self, hostname):
if(not self.verify):
log.msg('httpClient ignores verify=false, WILL verify certificate chain for %s against certdir' % (hostname), system = LOG_SYSTEM)
return ssl.optionsForClientTLS(hostname, trustRoot=self._trustRoot, clientCertificate=self._clientCertificate, extraCertificateOptions=self._extraCertificateOptions)
def getContext(self):
if self.ctx is None:
self.ctx = self.createOpenSSLContext()
return self.ctx
def createOpenSSLContext(self):
self.ctx = opennsaTlsContext.createOpenSSLContext(self)
log.msg('adding key and certificate to OpenSSL SSL Context ...', system=LOG_SYSTEM)
self.ctx.use_privatekey_file(self.private_key_path)
self.ctx.use_certificate_chain_file(self.public_key_path)
self.ctx.check_privatekey() # sanity check
return self.ctx
def main():
log.startLogging(stdout)
opennsaContext = opennsa2WayTlsContext('server.key', 'server.crt', 'trusted_ca_s', False)
log.msg('trustRoot = %s' % opennsaContext.getTrustRoot(), system = LOG_SYSTEM)
log.msg('extraCertificateOptions = %s' % opennsaContext.getExtraCertificateOptions(), system = LOG_SYSTEM)
log.msg('clientCertificate = %s' % opennsaContext.getClientCertificate().getSubject(), system = LOG_SYSTEM)
log.msg('OpenSSLContext = %s' % opennsaContext.getContext(), system = LOG_SYSTEM)
log.msg('ClientTLSOptions = %s' % opennsaContext.getClientTLSOptions('some.hostname'), system = LOG_SYSTEM)
if __name__ == "__main__":
main()
...@@ -78,7 +78,7 @@ def httpRequest(url, payload, headers, method=b'POST', timeout=DEFAULT_TIMEOUT, ...@@ -78,7 +78,7 @@ def httpRequest(url, payload, headers, method=b'POST', timeout=DEFAULT_TIMEOUT,
if scheme == b'https': if scheme == b'https':
if ctx_factory is None: if ctx_factory is None:
return defer.fail(HTTPRequestError('Cannot perform https request without context factory')) return defer.fail(HTTPRequestError('Cannot perform https request without context factory'))
reactor.connectSSL(host, port, factory, ctx_factory) reactor.connectSSL(host, port, factory, ctx_factory.getClientTLSOptions(host.decode()))
else: else:
reactor.connectTCP(host, port, factory) reactor.connectTCP(host, port, factory)
......
...@@ -103,14 +103,14 @@ def setupTLSContext(vc): ...@@ -103,14 +103,14 @@ def setupTLSContext(vc):
# ssl/tls contxt # ssl/tls contxt
if vc[config.TLS]: if vc[config.TLS]:
from opennsa import ctxfactory from opennsa.opennsaTlsContext import opennsa2WayTlsContext
ctx_factory = ctxfactory.ContextFactory(vc[config.KEY], vc[config.CERTIFICATE], vc[config.CERTIFICATE_DIR], vc[config.VERIFY_CERT]) ctx_factory = opennsa2WayTlsContext(vc[config.KEY], vc[config.CERTIFICATE], vc[config.CERTIFICATE_DIR], vc[config.VERIFY_CERT])
elif vc[config.CERTIFICATE_DIR]: elif vc[config.CERTIFICATE_DIR]:
# create a context so we can verify https urls # create a context so we can verify https urls
if not os.path.isdir(vc[config.CERTIFICATE_DIR]): if not os.path.isdir(vc[config.CERTIFICATE_DIR]):
raise config.ConfigurationError('certdir value {} is not a directory'.format(vc[config.CERTIFICATE_DIR])) raise config.ConfigurationError('certdir value {} is not a directory'.format(vc[config.CERTIFICATE_DIR]))
from opennsa import ctxfactory from opennsa.opennsaTlsContext import opennsaTlsContext
ctx_factory = ctxfactory.RequestContextFactory(vc[config.CERTIFICATE_DIR], vc[config.VERIFY_CERT]) ctx_factory = opennsaTlsContext(vc[config.CERTIFICATE_DIR], vc[config.VERIFY_CERT])
else: else:
ctx_factory = None ctx_factory = None
......
...@@ -3,3 +3,5 @@ twistar>=2.0 ...@@ -3,3 +3,5 @@ twistar>=2.0
psycopg2>=2.7,<2.8 --no-binary psycopg2 psycopg2>=2.7,<2.8 --no-binary psycopg2
pyOpenSSL>=17.5.0 pyOpenSSL>=17.5.0
python-dateutil python-dateutil
service_identity
idna
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment