559 lines
24 KiB
Python
Executable File
559 lines
24 KiB
Python
Executable File
#! /usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
# $Id: profiles_tests.py 3254 2007-06-05 21:23:57Z fpeters $
|
|
#
|
|
# Python unit tests for Lasso library
|
|
#
|
|
# Copyright (C) 2004-2007 Entr'ouvert
|
|
# http://lasso.entrouvert.org
|
|
#
|
|
# Authors: See AUTHORS file in top-level directory.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
|
|
|
|
|
import os
|
|
import unittest
|
|
import sys
|
|
|
|
if not '..' in sys.path:
|
|
sys.path.insert(0, '..')
|
|
if not '../.libs' in sys.path:
|
|
sys.path.insert(0, '../.libs')
|
|
|
|
import lasso
|
|
import logging
|
|
|
|
logging.basicConfig()
|
|
|
|
|
|
try:
|
|
dataDir
|
|
except NameError:
|
|
srcdir = os.environ.get('TOP_SRCDIR', '.')
|
|
dataDir = '%s/tests/data' % srcdir
|
|
|
|
|
|
def server(local_name, remote_role, remote_name):
|
|
pwd = os.path.join(dataDir, local_name, 'password')
|
|
password = None
|
|
if os.path.exists(pwd):
|
|
password = open(pwd).read()
|
|
s = lasso.Server(
|
|
os.path.join(dataDir, local_name, 'metadata.xml'),
|
|
os.path.join(dataDir, local_name, 'private-key.pem'),
|
|
password)
|
|
s.addProvider(remote_role, os.path.join(dataDir, remote_name, 'metadata.xml'))
|
|
return s
|
|
|
|
|
|
class ServerTestCase(unittest.TestCase):
|
|
def test01(self):
|
|
"""Server construction, dump & newFromDump."""
|
|
|
|
lassoServer = lasso.Server(
|
|
os.path.join(dataDir, 'sp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'sp1-la/private-key-raw.pem'),
|
|
None,
|
|
os.path.join(dataDir, 'sp1-la/certificate.pem'))
|
|
lassoServer.addProvider(
|
|
lasso.PROVIDER_ROLE_IDP,
|
|
os.path.join(dataDir, 'idp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'idp1-la/public-key.pem'),
|
|
os.path.join(dataDir, 'idp1-la/certificate.pem'))
|
|
dump = lassoServer.dump()
|
|
lassoServer2 = lassoServer.newFromDump(dump)
|
|
dump2 = lassoServer2.dump()
|
|
self.assertEqual(dump, dump2)
|
|
|
|
def test02(self):
|
|
"""Server construction without argument, dump & newFromDump."""
|
|
|
|
lassoServer = lasso.Server(
|
|
os.path.join(dataDir, 'sp1-la/metadata.xml'))
|
|
lassoServer.addProvider(
|
|
lasso.PROVIDER_ROLE_IDP,
|
|
os.path.join(dataDir, 'idp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'idp1-la/public-key.pem'))
|
|
dump = lassoServer.dump()
|
|
lassoServer2 = lassoServer.newFromDump(dump)
|
|
dump2 = lassoServer2.dump()
|
|
self.assertEqual(dump, dump2)
|
|
|
|
|
|
class LoginTestCase(unittest.TestCase):
|
|
def test01(self):
|
|
"""SP login; testing access to authentication request."""
|
|
|
|
lassoServer = lasso.Server(
|
|
os.path.join(dataDir, 'sp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'sp1-la/private-key-raw.pem'),
|
|
None,
|
|
os.path.join(dataDir, 'sp1-la/certificate.pem'))
|
|
lassoServer.addProvider(
|
|
lasso.PROVIDER_ROLE_IDP,
|
|
os.path.join(dataDir, 'idp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'idp1-la/public-key.pem'),
|
|
os.path.join(dataDir, 'idp1-la/certificate.pem'))
|
|
login = lasso.Login(lassoServer)
|
|
login.initAuthnRequest()
|
|
login.request
|
|
login.request.protocolProfile = lasso.LIB_PROTOCOL_PROFILE_BRWS_ART
|
|
self.assertEqual(login.request.protocolProfile, lasso.LIB_PROTOCOL_PROFILE_BRWS_ART)
|
|
|
|
def test02(self):
|
|
"""SP login; testing processing of an empty Response."""
|
|
|
|
lassoServer = lasso.Server(
|
|
os.path.join(dataDir, 'sp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'sp1-la/private-key-raw.pem'),
|
|
None,
|
|
os.path.join(dataDir, 'sp1-la/certificate.pem'))
|
|
lassoServer.addProvider(
|
|
lasso.PROVIDER_ROLE_IDP,
|
|
os.path.join(dataDir, 'idp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'idp1-la/public-key.pem'),
|
|
os.path.join(dataDir, 'idp1-la/certificate.pem'))
|
|
login = lasso.Login(lassoServer)
|
|
try:
|
|
login.processResponseMsg('')
|
|
except lasso.Error as error:
|
|
if error[0] != lasso.PROFILE_ERROR_INVALID_MSG:
|
|
raise
|
|
|
|
def test03(self):
|
|
"""Conversion of a lib:AuthnRequest with an AuthnContext into a query and back."""
|
|
|
|
sp = lasso.Server(
|
|
os.path.join(dataDir, 'sp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'sp1-la/private-key-raw.pem'),
|
|
None,
|
|
os.path.join(dataDir, 'sp1-la/certificate.pem'))
|
|
sp.addProvider(
|
|
lasso.PROVIDER_ROLE_IDP,
|
|
os.path.join(dataDir, 'idp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'idp1-la/public-key.pem'),
|
|
os.path.join(dataDir, 'idp1-la/certificate.pem'))
|
|
spLogin = lasso.Login(sp)
|
|
spLogin.initAuthnRequest()
|
|
requestAuthnContext = lasso.LibRequestAuthnContext()
|
|
authnContextClassRefsList = []
|
|
authnContextClassRefsList.append(
|
|
lasso.LIB_AUTHN_CONTEXT_CLASS_REF_PASSWORD)
|
|
requestAuthnContext.authnContextClassRef = tuple(authnContextClassRefsList)
|
|
spLogin.request.requestAuthnContext = requestAuthnContext
|
|
spLogin.request.protocolProfile = lasso.LIB_PROTOCOL_PROFILE_BRWS_ART
|
|
spLogin.buildAuthnRequestMsg()
|
|
authnRequestQuery = spLogin.msgUrl[spLogin.msgUrl.index('?') + 1:]
|
|
idp = lasso.Server(
|
|
os.path.join(dataDir, 'idp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'idp1-la/private-key-raw.pem'),
|
|
None,
|
|
os.path.join(dataDir, 'idp1-la/certificate.pem'))
|
|
idp.addProvider(
|
|
lasso.PROVIDER_ROLE_SP,
|
|
os.path.join(dataDir, 'sp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'sp1-la/public-key.pem'),
|
|
os.path.join(dataDir, 'sp1-la/certificate.pem'))
|
|
idpLogin = lasso.Login(idp)
|
|
idpLogin.processAuthnRequestMsg(authnRequestQuery)
|
|
self.assertTrue(idpLogin.request.requestAuthnContext)
|
|
authnContextClassRefsList = idpLogin.request.requestAuthnContext.authnContextClassRef
|
|
self.assertEqual(len(authnContextClassRefsList), 1)
|
|
self.assertEqual(authnContextClassRefsList[0], lasso.LIB_AUTHN_CONTEXT_CLASS_REF_PASSWORD)
|
|
|
|
def test04(self):
|
|
"""Conversion of a lib:AuthnRequest with extensions into a query and back."""
|
|
|
|
sp = lasso.Server(
|
|
os.path.join(dataDir, 'sp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'sp1-la/private-key-raw.pem'),
|
|
None,
|
|
os.path.join(dataDir, 'sp1-la/certificate.pem'))
|
|
sp.addProvider(
|
|
lasso.PROVIDER_ROLE_IDP,
|
|
os.path.join(dataDir, 'idp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'idp1-la/public-key.pem'),
|
|
os.path.join(dataDir, 'idp1-la/certificate.pem'))
|
|
spLogin = lasso.Login(sp)
|
|
spLogin.initAuthnRequest()
|
|
extensionList = []
|
|
for extension in (
|
|
'<action>do</action>',
|
|
'<action2>do action 2</action2><action3>do action 3</action3>'):
|
|
extensionList.append(
|
|
'<lib:Extension xmlns:lib="urn:liberty:iff:2003-08">%s</lib:Extension>'
|
|
% extension)
|
|
spLogin.request.extension = tuple(extensionList)
|
|
spLogin.request.protocolProfile = lasso.LIB_PROTOCOL_PROFILE_BRWS_ART
|
|
spLogin.buildAuthnRequestMsg()
|
|
authnRequestQuery = spLogin.msgUrl[spLogin.msgUrl.index('?') + 1:]
|
|
idp = lasso.Server(
|
|
os.path.join(dataDir, 'idp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'idp1-la/private-key-raw.pem'),
|
|
None,
|
|
os.path.join(dataDir, 'idp1-la/certificate.pem'))
|
|
idp.addProvider(
|
|
lasso.PROVIDER_ROLE_SP,
|
|
os.path.join(dataDir, 'sp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'sp1-la/public-key.pem'),
|
|
os.path.join(dataDir, 'sp1-la/certificate.pem'))
|
|
idpLogin = lasso.Login(idp)
|
|
idpLogin.processAuthnRequestMsg(authnRequestQuery)
|
|
self.assertTrue(idpLogin.request.extension)
|
|
extensionsList = idpLogin.request.extension
|
|
self.assertEqual(len(extensionsList), 1)
|
|
self.assertTrue('<action>do</action>' in extensionsList[0])
|
|
self.assertTrue('<action2>do action 2</action2>' in extensionsList[0])
|
|
self.assertTrue('<action3>do action 3</action3>' in extensionsList[0])
|
|
|
|
def test05(self):
|
|
'''SAMLv2 Authn request emitted and received using Artifact binding'''
|
|
sp = lasso.Server(
|
|
os.path.join(dataDir, 'sp5-saml2/metadata.xml'),
|
|
os.path.join(dataDir, 'sp5-saml2/private-key.pem'))
|
|
assert sp
|
|
sp.addProvider(
|
|
lasso.PROVIDER_ROLE_IDP,
|
|
os.path.join(dataDir, 'idp5-saml2/metadata.xml'))
|
|
sp_login = lasso.Login(sp)
|
|
assert sp_login
|
|
sp_login.initAuthnRequest(None, lasso.HTTP_METHOD_ARTIFACT_GET)
|
|
sp_login.buildAuthnRequestMsg()
|
|
sp_login_dump = sp_login.dump()
|
|
idp = lasso.Server(
|
|
os.path.join(dataDir, 'idp5-saml2/metadata.xml'),
|
|
os.path.join(dataDir, 'idp5-saml2/private-key.pem'))
|
|
idp.addProvider(
|
|
lasso.PROVIDER_ROLE_SP,
|
|
os.path.join(dataDir, 'sp5-saml2/metadata.xml'))
|
|
idp_login = lasso.Login(idp)
|
|
idp_login.initRequest(sp_login.msgUrl.split('?')[1], lasso.HTTP_METHOD_ARTIFACT_GET)
|
|
idp_login.buildRequestMsg()
|
|
sp_login2 = lasso.Login.newFromDump(sp, sp_login_dump)
|
|
assert isinstance(sp_login2, lasso.Login)
|
|
assert idp_login.msgBody
|
|
sp_login2.processRequestMsg(idp_login.msgBody)
|
|
sp_login2.buildResponseMsg()
|
|
assert sp_login2.msgBody
|
|
try:
|
|
idp_login.processResponseMsg(sp_login2.msgBody)
|
|
except Exception:
|
|
raise
|
|
assert isinstance(idp_login.request, lasso.Samlp2AuthnRequest)
|
|
|
|
def test_06(self):
|
|
'''Login test between SP and IdP with encrypted private keys'''
|
|
sp_server = server('sp7-saml2', lasso.PROVIDER_ROLE_IDP, 'idp7-saml2')
|
|
idp_server = server('idp7-saml2', lasso.PROVIDER_ROLE_SP, 'sp7-saml2')
|
|
|
|
sp_login = lasso.Login(sp_server)
|
|
sp_login.initAuthnRequest()
|
|
sp_login.request.protocolBinding = lasso.SAML2_METADATA_BINDING_POST
|
|
sp_login.buildAuthnRequestMsg()
|
|
idp_login = lasso.Login(idp_server)
|
|
idp_login.setSignatureVerifyHint(lasso.PROFILE_SIGNATURE_VERIFY_HINT_FORCE)
|
|
idp_login.processAuthnRequestMsg(sp_login.msgUrl.split('?')[1])
|
|
idp_login.validateRequestMsg(True, True)
|
|
idp_login.buildAssertion("None", "None", "None", "None", "None")
|
|
idp_login.buildAuthnResponseMsg()
|
|
sp_login.setSignatureVerifyHint(lasso.PROFILE_SIGNATURE_VERIFY_HINT_FORCE)
|
|
sp_login.processAuthnResponseMsg(idp_login.msgBody)
|
|
sp_login.acceptSso()
|
|
|
|
def test07(self):
|
|
'''SAMLv2 SSO with DSA key for the IdP'''
|
|
default_sign_meth = lasso.getDefaultSignatureMethod()
|
|
if default_sign_meth != lasso.SIGNATURE_METHOD_RSA_SHA1:
|
|
self.skipTest("This test requires that lasso is compiled with SHA1 as the default signature method")
|
|
|
|
sp = lasso.Server(
|
|
os.path.join(dataDir, 'sp5-saml2/metadata.xml'),
|
|
os.path.join(dataDir, 'sp5-saml2/private-key.pem'))
|
|
assert sp
|
|
sp.addProvider(
|
|
lasso.PROVIDER_ROLE_IDP,
|
|
os.path.join(dataDir, 'idp12-dsa-saml2/metadata.xml'))
|
|
sp_login = lasso.Login(sp)
|
|
assert sp_login
|
|
sp_login.initAuthnRequest(None, lasso.HTTP_METHOD_REDIRECT)
|
|
sp_login.buildAuthnRequestMsg()
|
|
idp = lasso.Server(
|
|
os.path.join(dataDir, 'idp12-dsa-saml2/metadata.xml'),
|
|
os.path.join(dataDir, 'idp12-dsa-saml2/private-key.pem'))
|
|
idp.signatureMethod = lasso.SIGNATURE_METHOD_DSA_SHA1
|
|
idp.addProvider(
|
|
lasso.PROVIDER_ROLE_SP,
|
|
os.path.join(dataDir, 'sp5-saml2/metadata.xml'))
|
|
idp_login = lasso.Login(idp)
|
|
idp_login.processAuthnRequestMsg(sp_login.msgUrl.split('?')[1])
|
|
idp_login.protocolProfile = lasso.LOGIN_PROTOCOL_PROFILE_BRWS_POST
|
|
idp_login.validateRequestMsg(True, True)
|
|
idp_login.buildAssertion("None", "None", "None", "None", "None")
|
|
idp_login.buildAuthnResponseMsg()
|
|
|
|
def test08(self):
|
|
'''Verify KeyEncryptionMethod support'''
|
|
sp_server = server('sp5-saml2', lasso.PROVIDER_ROLE_IDP, 'idp5-saml2')
|
|
idp_server = server('idp5-saml2', lasso.PROVIDER_ROLE_SP, 'sp5-saml2')
|
|
|
|
def run(key_encryption_method=None):
|
|
sp_login = lasso.Login(sp_server)
|
|
sp_login.initAuthnRequest(None, lasso.HTTP_METHOD_REDIRECT)
|
|
sp_login.buildAuthnRequestMsg()
|
|
|
|
provider = idp_server.getProvider('http://sp5/metadata')
|
|
provider.setEncryptionMode(lasso.ENCRYPTION_MODE_ASSERTION)
|
|
|
|
if key_encryption_method:
|
|
provider.setKeyEncryptionMethod(key_encryption_method)
|
|
|
|
idp_login = lasso.Login(idp_server)
|
|
idp_login.processAuthnRequestMsg(sp_login.msgUrl.split('?')[1])
|
|
idp_login.protocolProfile = lasso.LOGIN_PROTOCOL_PROFILE_BRWS_POST
|
|
idp_login.validateRequestMsg(True, True)
|
|
idp_login.buildAssertion("None", "None", "None", "None", "None")
|
|
idp_login.buildAuthnResponseMsg()
|
|
|
|
sp_login.setSignatureVerifyHint(lasso.PROFILE_SIGNATURE_VERIFY_HINT_FORCE)
|
|
sp_login.processAuthnResponseMsg(idp_login.msgBody)
|
|
sp_login.acceptSso()
|
|
return sp_login.response.debug()
|
|
|
|
os.environ['LASSO_DEFAULT_KEY_ENCRYPTION_METHOD'] = 'rsa-pkcs1'
|
|
lasso.init()
|
|
assert 'xmlenc#rsa-1_5' in run()
|
|
assert 'xmlenc#rsa-oaep-mgf1p' not in run()
|
|
|
|
os.environ['LASSO_DEFAULT_KEY_ENCRYPTION_METHOD'] = 'rsa-oaep'
|
|
lasso.init()
|
|
assert 'xmlenc#rsa-1_5' not in run()
|
|
assert 'xmlenc#rsa-oaep-mgf1p' in run()
|
|
|
|
lasso.setDefaultKeyEncryptionMethod(lasso.KEY_ENCRYPTION_METHOD_PKCS1)
|
|
assert 'xmlenc#rsa-1_5' in run()
|
|
assert 'xmlenc#rsa-oaep-mgf1p' not in run()
|
|
|
|
lasso.setDefaultKeyEncryptionMethod(lasso.KEY_ENCRYPTION_METHOD_OAEP)
|
|
assert 'xmlenc#rsa-1_5' not in run()
|
|
assert 'xmlenc#rsa-oaep-mgf1p' in run()
|
|
|
|
assert 'xmlenc#rsa-1_5' in run(key_encryption_method=lasso.KEY_ENCRYPTION_METHOD_PKCS1)
|
|
assert 'xmlenc#rsa-oaep-mgf1p' not in run(key_encryption_method=lasso.KEY_ENCRYPTION_METHOD_PKCS1)
|
|
|
|
assert 'xmlenc#rsa-1_5' not in run(key_encryption_method=lasso.KEY_ENCRYPTION_METHOD_OAEP)
|
|
assert 'xmlenc#rsa-oaep-mgf1p' in run(key_encryption_method=lasso.KEY_ENCRYPTION_METHOD_OAEP)
|
|
|
|
|
|
class LogoutTestCase(unittest.TestCase):
|
|
def test01(self):
|
|
"""SP logout without session and identity; testing initRequest."""
|
|
|
|
lassoServer = lasso.Server(
|
|
os.path.join(dataDir, 'sp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'sp1-la/private-key-raw.pem'),
|
|
None,
|
|
os.path.join(dataDir, 'sp1-la/certificate.pem'))
|
|
lassoServer.addProvider(
|
|
lasso.PROVIDER_ROLE_IDP,
|
|
os.path.join(dataDir, 'idp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'idp1-la/public-key.pem'),
|
|
os.path.join(dataDir, 'idp1-la/certificate.pem'))
|
|
logout = lasso.Logout(lassoServer)
|
|
try:
|
|
logout.initRequest()
|
|
except lasso.Error as error:
|
|
if error[0] != lasso.PROFILE_ERROR_SESSION_NOT_FOUND:
|
|
raise
|
|
else:
|
|
self.fail('logout.initRequest without having set identity before should fail')
|
|
|
|
def test02(self):
|
|
"""IDP logout without session and identity; testing logout.getNextProviderId."""
|
|
|
|
lassoServer = lasso.Server(
|
|
os.path.join(dataDir, 'idp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'idp1-la/private-key-raw.pem'),
|
|
None,
|
|
os.path.join(dataDir, 'idp1-la/certificate.pem'))
|
|
lassoServer.addProvider(
|
|
lasso.PROVIDER_ROLE_SP,
|
|
os.path.join(dataDir, 'sp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'sp1-la/public-key.pem'),
|
|
os.path.join(dataDir, 'sp1-la/certificate.pem'))
|
|
logout = lasso.Logout(lassoServer)
|
|
self.assertFalse(logout.getNextProviderId())
|
|
|
|
def test03(self):
|
|
"""IDP logout; testing processRequestMsg with non Liberty query."""
|
|
|
|
lassoServer = lasso.Server(
|
|
os.path.join(dataDir, 'idp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'idp1-la/private-key-raw.pem'),
|
|
None,
|
|
os.path.join(dataDir, 'idp1-la/certificate.pem'))
|
|
lassoServer.addProvider(
|
|
lasso.PROVIDER_ROLE_SP,
|
|
os.path.join(dataDir, 'sp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'sp1-la/public-key.pem'),
|
|
os.path.join(dataDir, 'sp1-la/certificate.pem'))
|
|
logout = lasso.Logout(lassoServer)
|
|
# The processRequestMsg should fail but not abort.
|
|
try:
|
|
logout.processRequestMsg('passport=0&lasso=1')
|
|
except lasso.Error as error:
|
|
if error[0] != lasso.PROFILE_ERROR_INVALID_MSG:
|
|
raise
|
|
else:
|
|
self.fail('Logout processRequestMsg should have failed.')
|
|
|
|
def test04(self):
|
|
"""IDP logout; testing processResponseMsg with non Liberty query."""
|
|
|
|
lassoServer = lasso.Server(
|
|
os.path.join(dataDir, 'idp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'idp1-la/private-key-raw.pem'),
|
|
None,
|
|
os.path.join(dataDir, 'idp1-la/certificate.pem'))
|
|
lassoServer.addProvider(
|
|
lasso.PROVIDER_ROLE_SP,
|
|
os.path.join(dataDir, 'sp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'sp1-la/public-key.pem'),
|
|
os.path.join(dataDir, 'sp1-la/certificate.pem'))
|
|
logout = lasso.Logout(lassoServer)
|
|
# The processResponseMsg should fail but not abort.
|
|
try:
|
|
logout.processResponseMsg('liberty=&alliance')
|
|
except lasso.Error as error:
|
|
if error[0] != lasso.PROFILE_ERROR_INVALID_MSG:
|
|
raise
|
|
else:
|
|
self.fail('Logout processResponseMsg should have failed.')
|
|
|
|
def test05(self):
|
|
'''Test parsing of a logout request with more than one session index'''
|
|
content = '''<samlp:LogoutRequest xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol" \
|
|
xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion" ID="xxxx" Version="2.0" IssueInstant="2010-06-14T22:00:00">
|
|
<saml:Issuer>me</saml:Issuer>
|
|
<saml:NameID>coin</saml:NameID>
|
|
<samlp:SessionIndex>id1</samlp:SessionIndex>
|
|
<samlp:SessionIndex>id2</samlp:SessionIndex>
|
|
<samlp:SessionIndex>id3</samlp:SessionIndex>
|
|
</samlp:LogoutRequest>'''
|
|
|
|
node = lasso.Samlp2LogoutRequest.newFromXmlNode(content)
|
|
assert isinstance(node, lasso.Samlp2LogoutRequest)
|
|
assert node.sessionIndex == 'id1'
|
|
assert node.sessionIndexes == ('id1', 'id2', 'id3')
|
|
|
|
|
|
class DefederationTestCase(unittest.TestCase):
|
|
def test01(self):
|
|
"""IDP initiated defederation; testing processNotificationMsg with non Liberty query."""
|
|
|
|
lassoServer = lasso.Server(
|
|
os.path.join(dataDir, 'idp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'idp1-la/private-key-raw.pem'),
|
|
None,
|
|
os.path.join(dataDir, 'idp1-la/certificate.pem'))
|
|
lassoServer.addProvider(
|
|
lasso.PROVIDER_ROLE_SP,
|
|
os.path.join(dataDir, 'sp1-la/metadata.xml'),
|
|
os.path.join(dataDir, 'sp1-la/public-key.pem'),
|
|
os.path.join(dataDir, 'sp1-la/certificate.pem'))
|
|
defederation = lasso.Defederation(lassoServer)
|
|
# The processNotificationMsg should fail but not abort.
|
|
try:
|
|
defederation.processNotificationMsg('nonLibertyQuery=1')
|
|
except lasso.Error as error:
|
|
if error[0] != lasso.PROFILE_ERROR_INVALID_MSG:
|
|
raise
|
|
else:
|
|
self.fail('Defederation processNotificationMsg should have failed.')
|
|
|
|
|
|
class IdentityTestCase(unittest.TestCase):
|
|
def test01(self):
|
|
"""Identity newFromDump & dump."""
|
|
return
|
|
# test disabled since dump format changed
|
|
identityDump = """<Identity xmlns="http://www.entrouvert.org/namespaces/lasso/0.0" Version="1"><Federations><Federation xmlns="http://www.entrouvert.org/namespaces/lasso/0.0" Version="1" RemoteProviderID="https://sp1.entrouvert.lan/metadata"><LocalNameIdentifier><saml:NameIdentifier xmlns:saml="urn:oasis:names:tc:SAML:1.0:assertion" NameQualifier="https://proxy2.entrouvert.lan/metadata" Format="urn:liberty:iff:nameid:federated">_CD739B41C602EAEA93626EBD1751CB46</saml:NameIdentifier></LocalNameIdentifier></Federation><Federation xmlns="http://www.entrouvert.org/namespaces/lasso/0.0" Version="1" RemoteProviderID="https://idp1.entrouvert.lan/metadata"><RemoteNameIdentifier><saml:NameIdentifier xmlns:saml="urn:oasis:names:tc:SAML:1.0:assertion" NameQualifier="https://idp1.entrouvert.lan/metadata" Format="urn:liberty:iff:nameid:federated">_11EA77A4FED32C41824AC5DE87298E65</saml:NameIdentifier></RemoteNameIdentifier></Federation></Federations></Identity>"""
|
|
identity = lasso.Identity.newFromDump(identityDump)
|
|
newIdentityDump = identity.dump()
|
|
self.assertEqual(identityDump, newIdentityDump)
|
|
|
|
class AttributeAuthorityTestCase(unittest.TestCase):
|
|
def test01(self):
|
|
'''Attribute request and response test between sp5 and idp6'''
|
|
s = lasso.Server(
|
|
os.path.join(dataDir, 'sp5-saml2/metadata.xml'),
|
|
os.path.join(dataDir, 'sp5-saml2/private-key.pem'))
|
|
s.addProvider(lasso.PROVIDER_ROLE_ATTRIBUTE_AUTHORITY, os.path.join(dataDir, 'idp6-saml2/metadata.xml'))
|
|
|
|
s2 = lasso.Server(
|
|
os.path.join(dataDir, 'idp6-saml2/metadata.xml'),
|
|
os.path.join(dataDir, 'idp6-saml2/private-key.pem'))
|
|
s2.addProvider(lasso.PROVIDER_ROLE_SP, os.path.join(dataDir, 'sp5-saml2/metadata.xml'))
|
|
|
|
aq = lasso.AssertionQuery(s)
|
|
rpid = list(s.providers.keys())[0]
|
|
aq.initRequest(rpid, lasso.HTTP_METHOD_SOAP, lasso.ASSERTION_QUERY_REQUEST_TYPE_ATTRIBUTE)
|
|
assert aq.request
|
|
assert aq.remoteProviderId == rpid
|
|
nid = lasso.Saml2NameID.newWithPersistentFormat(
|
|
lasso.buildUniqueId(32),
|
|
s.providerId, s2.providerId)
|
|
aq.nameIdentifier = nid
|
|
aq.addAttributeRequest(lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC, 'testAttribute')
|
|
aq.buildRequestMsg()
|
|
assert aq.msgUrl
|
|
assert aq.msgBody
|
|
|
|
aq2 = lasso.AssertionQuery(s2)
|
|
aq2.processRequestMsg(aq.msgBody)
|
|
assert aq.request
|
|
aq2.validateRequest()
|
|
assert aq2.response
|
|
assertion = lasso.Saml2Assertion()
|
|
aq2.response.assertion = (assertion, )
|
|
for attribute in aq2.request.attribute:
|
|
content = lasso.MiscTextNode.newWithString("xxx")
|
|
content.textChild = True
|
|
assertion.addAttributeWithNode(attribute.name, attribute.nameFormat, content)
|
|
assertion.addAttributeWithNode(attribute.name, attribute.nameFormat, content)
|
|
assertion.subject = aq.request.subject
|
|
s2.saml2AssertionSetupSignature(assertion)
|
|
aq2.buildResponseMsg()
|
|
aq.processResponseMsg(aq2.msgBody)
|
|
assert aq.response
|
|
assert aq.response.assertion[0]
|
|
assert aq.response.assertion[0].attributeStatement[0]
|
|
assert aq.response.assertion[0].attributeStatement[0].attribute[0]
|
|
assert aq.response.assertion[0].attributeStatement[0].attribute[0].attributeValue[0]
|
|
|
|
serverSuite = unittest.makeSuite(ServerTestCase, 'test')
|
|
loginSuite = unittest.makeSuite(LoginTestCase, 'test')
|
|
logoutSuite = unittest.makeSuite(LogoutTestCase, 'test')
|
|
defederationSuite = unittest.makeSuite(DefederationTestCase, 'test')
|
|
identitySuite = unittest.makeSuite(IdentityTestCase, 'test')
|
|
attributeSuite = unittest.makeSuite(AttributeAuthorityTestCase, 'test')
|
|
|
|
allTests = unittest.TestSuite((serverSuite, loginSuite, logoutSuite, defederationSuite,
|
|
identitySuite, attributeSuite))
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(not unittest.TextTestRunner(verbosity=2).run(allTests).wasSuccessful())
|
|
|