¡@

Home 

OpenStack Study: test_keystoneclient_sql.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2012 OpenStack Foundation

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

import os

import uuid

from keystoneclient.contrib.ec2 import utils as ec2_utils

from keystone import config

from keystone import tests

from keystone.tests import test_keystoneclient

CONF = config.CONF

**** CubicPower OpenStack Study ****

class KcMasterSqlTestCase(test_keystoneclient.KcMasterTestCase):

**** CubicPower OpenStack Study ****

    def config_files(self):

        config_files = super(KcMasterSqlTestCase, self).config_files()

        config_files.append(tests.dirs.tests_conf('backend_sql.conf'))

        return config_files

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(KcMasterSqlTestCase, self).setUp()

        self.default_client = self.get_client()

        self.addCleanup(self.cleanup_instance('default_client'))

**** CubicPower OpenStack Study ****

    def test_endpoint_crud(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        service = client.services.create(name=uuid.uuid4().hex,

                                         service_type=uuid.uuid4().hex,

                                         description=uuid.uuid4().hex)

        endpoint_region = uuid.uuid4().hex

        invalid_service_id = uuid.uuid4().hex

        endpoint_publicurl = uuid.uuid4().hex

        endpoint_internalurl = uuid.uuid4().hex

        endpoint_adminurl = uuid.uuid4().hex

        # a non-existent service ID should trigger a 404

        self.assertRaises(client_exceptions.NotFound,

                          client.endpoints.create,

                          region=endpoint_region,

                          service_id=invalid_service_id,

                          publicurl=endpoint_publicurl,

                          adminurl=endpoint_adminurl,

                          internalurl=endpoint_internalurl)

        endpoint = client.endpoints.create(region=endpoint_region,

                                           service_id=service.id,

                                           publicurl=endpoint_publicurl,

                                           adminurl=endpoint_adminurl,

                                           internalurl=endpoint_internalurl)

        self.assertEqual(endpoint.region, endpoint_region)

        self.assertEqual(endpoint.service_id, service.id)

        self.assertEqual(endpoint.publicurl, endpoint_publicurl)

        self.assertEqual(endpoint.internalurl, endpoint_internalurl)

        self.assertEqual(endpoint.adminurl, endpoint_adminurl)

        client.endpoints.delete(id=endpoint.id)

        self.assertRaises(client_exceptions.NotFound, client.endpoints.delete,

                          id=endpoint.id)

**** CubicPower OpenStack Study ****

    def _send_ec2_auth_request(self, credentials, client=None):

        if not client:

            client = self.default_client

        url = '%s/ec2tokens' % self.default_client.auth_url

        (resp, token) = client.request(

            url=url, method='POST',

            body={'credentials': credentials})

        return resp, token

**** CubicPower OpenStack Study ****

    def _generate_default_user_ec2_credentials(self):

        cred = self. default_client.ec2.create(

            user_id=self.user_foo['id'],

            tenant_id=self.tenant_bar['id'])

        return self._generate_user_ec2_credentials(cred.access, cred.secret)

**** CubicPower OpenStack Study ****

    def _generate_user_ec2_credentials(self, access, secret):

        signer = ec2_utils.Ec2Signer(secret)

        credentials = {'params': {'SignatureVersion': '2'},

                       'access': access,

                       'verb': 'GET',

                       'host': 'localhost',

                       'path': '/service/cloud'}

        signature = signer.generate(credentials)

        return credentials, signature

**** CubicPower OpenStack Study ****

    def test_ec2_auth_success(self):

        credentials, signature = self._generate_default_user_ec2_credentials()

        credentials['signature'] = signature

        resp, token = self._send_ec2_auth_request(credentials)

        self.assertEqual(resp.status_code, 200)

        self.assertIn('access', token)

**** CubicPower OpenStack Study ****

    def test_ec2_auth_success_trust(self):

        # Add "other" role user_foo and create trust delegating it to user_two

        self.assignment_api.add_role_to_user_and_project(

            self.user_foo['id'],

            self.tenant_bar['id'],

            self.role_other['id'])

        trust_id = 'atrust123'

        trust = {'trustor_user_id': self.user_foo['id'],

                 'trustee_user_id': self.user_two['id'],

                 'project_id': self.tenant_bar['id'],

                 'impersonation': True}

        roles = [self.role_other]

        self.trust_api.create_trust(trust_id, trust, roles)

        # Create a client for user_two, scoped to the trust

        client = self.get_client(self.user_two)

        ret = client.authenticate(trust_id=trust_id,

                                  tenant_id=self.tenant_bar['id'])

        self.assertTrue(ret)

        self.assertTrue(client.auth_ref.trust_scoped)

        self.assertEqual(trust_id, client.auth_ref.trust_id)

        # Create an ec2 keypair using the trust client impersonating user_foo

        cred = client.ec2.create(user_id=self.user_foo['id'],

                                 tenant_id=self.tenant_bar['id'])

        credentials, signature = self._generate_user_ec2_credentials(

            cred.access, cred.secret)

        credentials['signature'] = signature

        resp, token = self._send_ec2_auth_request(credentials)

        self.assertEqual(resp.status_code, 200)

        self.assertEqual(trust_id, token['access']['trust']['id'])

        #TODO(shardy) we really want to check the roles and trustee

        # but because of where the stubbing happens we don't seem to

        # hit the necessary code in controllers.py _authenticate_token

        # so although all is OK via a real request, it incorrect in

        # this test..

**** CubicPower OpenStack Study ****

    def test_ec2_auth_failure(self):

        from keystoneclient import exceptions as client_exceptions

        credentials, signature = self._generate_default_user_ec2_credentials()

        credentials['signature'] = uuid.uuid4().hex

        self.assertRaises(client_exceptions.Unauthorized,

                          self._send_ec2_auth_request,

                          credentials)

**** CubicPower OpenStack Study ****

    def test_ec2_credential_crud(self):

        creds = self.default_client.ec2.list(user_id=self.user_foo['id'])

        self.assertEqual(creds, [])

        cred = self.default_client.ec2.create(user_id=self.user_foo['id'],

                                              tenant_id=self.tenant_bar['id'])

        creds = self.default_client.ec2.list(user_id=self.user_foo['id'])

        self.assertEqual(creds, [cred])

        got = self.default_client.ec2.get(user_id=self.user_foo['id'],

                                          access=cred.access)

        self.assertEqual(cred, got)

        self.default_client.ec2.delete(user_id=self.user_foo['id'],

                                       access=cred.access)

        creds = self.default_client.ec2.list(user_id=self.user_foo['id'])

        self.assertEqual(creds, [])

**** CubicPower OpenStack Study ****

    def test_ec2_credential_crud_non_admin(self):

        na_client = self.get_client(self.user_two)

        creds = na_client.ec2.list(user_id=self.user_two['id'])

        self.assertEqual(creds, [])

        cred = na_client.ec2.create(user_id=self.user_two['id'],

                                    tenant_id=self.tenant_baz['id'])

        creds = na_client.ec2.list(user_id=self.user_two['id'])

        self.assertEqual(creds, [cred])

        got = na_client.ec2.get(user_id=self.user_two['id'],

                                access=cred.access)

        self.assertEqual(cred, got)

        na_client.ec2.delete(user_id=self.user_two['id'],

                             access=cred.access)

        creds = na_client.ec2.list(user_id=self.user_two['id'])

        self.assertEqual(creds, [])

**** CubicPower OpenStack Study ****

    def test_ec2_list_credentials(self):

        cred_1 = self.default_client.ec2.create(

            user_id=self.user_foo['id'],

            tenant_id=self.tenant_bar['id'])

        cred_2 = self.default_client.ec2.create(

            user_id=self.user_foo['id'],

            tenant_id=self.tenant_service['id'])

        cred_3 = self.default_client.ec2.create(

            user_id=self.user_foo['id'],

            tenant_id=self.tenant_mtu['id'])

        two = self.get_client(self.user_two)

        cred_4 = two.ec2.create(user_id=self.user_two['id'],

                                tenant_id=self.tenant_bar['id'])

        creds = self.default_client.ec2.list(user_id=self.user_foo['id'])

        self.assertEqual(len(creds), 3)

        self.assertEqual(sorted([cred_1, cred_2, cred_3],

                                key=lambda x: x.access),

                         sorted(creds, key=lambda x: x.access))

        self.assertNotIn(cred_4, creds)

**** CubicPower OpenStack Study ****

    def test_ec2_credentials_create_404(self):

        from keystoneclient import exceptions as client_exceptions

        self.assertRaises(client_exceptions.NotFound,

                          self.default_client.ec2.create,

                          user_id=uuid.uuid4().hex,

                          tenant_id=self.tenant_bar['id'])

        self.assertRaises(client_exceptions.NotFound,

                          self.default_client.ec2.create,

                          user_id=self.user_foo['id'],

                          tenant_id=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_ec2_credentials_delete_404(self):

        from keystoneclient import exceptions as client_exceptions

        self.assertRaises(client_exceptions.NotFound,

                          self.default_client.ec2.delete,

                          user_id=uuid.uuid4().hex,

                          access=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_ec2_credentials_get_404(self):

        from keystoneclient import exceptions as client_exceptions

        self.assertRaises(client_exceptions.NotFound,

                          self.default_client.ec2.get,

                          user_id=uuid.uuid4().hex,

                          access=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_ec2_credentials_list_404(self):

        from keystoneclient import exceptions as client_exceptions

        self.assertRaises(client_exceptions.NotFound,

                          self.default_client.ec2.list,

                          user_id=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_ec2_credentials_list_user_forbidden(self):

        from keystoneclient import exceptions as client_exceptions

        two = self.get_client(self.user_two)

        self.assertRaises(client_exceptions.Forbidden, two.ec2.list,

                          user_id=self.user_foo['id'])

**** CubicPower OpenStack Study ****

    def test_ec2_credentials_get_user_forbidden(self):

        from keystoneclient import exceptions as client_exceptions

        cred = self.default_client.ec2.create(user_id=self.user_foo['id'],

                                              tenant_id=self.tenant_bar['id'])

        two = self.get_client(self.user_two)

        self.assertRaises(client_exceptions.Forbidden, two.ec2.get,

                          user_id=self.user_foo['id'], access=cred.access)

        self.default_client.ec2.delete(user_id=self.user_foo['id'],

                                       access=cred.access)

**** CubicPower OpenStack Study ****

    def test_ec2_credentials_delete_user_forbidden(self):

        from keystoneclient import exceptions as client_exceptions

        cred = self.default_client.ec2.create(user_id=self.user_foo['id'],

                                              tenant_id=self.tenant_bar['id'])

        two = self.get_client(self.user_two)

        self.assertRaises(client_exceptions.Forbidden, two.ec2.delete,

                          user_id=self.user_foo['id'], access=cred.access)

        self.default_client.ec2.delete(user_id=self.user_foo['id'],

                                       access=cred.access)

**** CubicPower OpenStack Study ****

    def test_endpoint_create_404(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.NotFound,

                          client.endpoints.create,

                          region=uuid.uuid4().hex,

                          service_id=uuid.uuid4().hex,

                          publicurl=uuid.uuid4().hex,

                          adminurl=uuid.uuid4().hex,

                          internalurl=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_endpoint_delete_404(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.NotFound,

                          client.endpoints.delete,

                          id=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_policy_crud(self):

        # FIXME(dolph): this test was written prior to the v3 implementation of

        #               the client and essentially refers to a non-existent

        #               policy manager in the v2 client. this test needs to be

        #               moved to a test suite running against the v3 api

        self.skipTest('Written prior to v3 client; needs refactor')

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        policy_blob = uuid.uuid4().hex

        policy_type = uuid.uuid4().hex

        service = client.services.create(

            name=uuid.uuid4().hex,

            service_type=uuid.uuid4().hex,

            description=uuid.uuid4().hex)

        endpoint = client.endpoints.create(

            service_id=service.id,

            region=uuid.uuid4().hex,

            adminurl=uuid.uuid4().hex,

            internalurl=uuid.uuid4().hex,

            publicurl=uuid.uuid4().hex)

        # create

        policy = client.policies.create(

            blob=policy_blob,

            type=policy_type,

            endpoint=endpoint.id)

        self.assertEqual(policy_blob, policy.policy)

        self.assertEqual(policy_type, policy.type)

        self.assertEqual(endpoint.id, policy.endpoint_id)

        policy = client.policies.get(policy=policy.id)

        self.assertEqual(policy_blob, policy.policy)

        self.assertEqual(policy_type, policy.type)

        self.assertEqual(endpoint.id, policy.endpoint_id)

        endpoints = [x for x in client.endpoints.list() if x.id == endpoint.id]

        endpoint = endpoints[0]

        self.assertEqual(policy_blob, policy.policy)

        self.assertEqual(policy_type, policy.type)

        self.assertEqual(endpoint.id, policy.endpoint_id)

        # update

        policy_blob = uuid.uuid4().hex

        policy_type = uuid.uuid4().hex

        endpoint = client.endpoints.create(

            service_id=service.id,

            region=uuid.uuid4().hex,

            adminurl=uuid.uuid4().hex,

            internalurl=uuid.uuid4().hex,

            publicurl=uuid.uuid4().hex)

        policy = client.policies.update(

            policy=policy.id,

            blob=policy_blob,

            type=policy_type,

            endpoint=endpoint.id)

        policy = client.policies.get(policy=policy.id)

        self.assertEqual(policy_blob, policy.policy)

        self.assertEqual(policy_type, policy.type)

        self.assertEqual(endpoint.id, policy.endpoint_id)

        # delete

        client.policies.delete(policy=policy.id)

        self.assertRaises(

            client_exceptions.NotFound,

            client.policies.get,

            policy=policy.id)

        policies = [x for x in client.policies.list() if x.id == policy.id]

        self.assertEqual(len(policies), 0)

**** CubicPower OpenStack Study ****

class KcOptTestCase(KcMasterSqlTestCase):

# Set KSCTEST_PATH to the keystoneclient directory, then run this test.

#

# For example, to test your local keystoneclient,

#

# KSCTEST_PATH=/opt/stack/python-keystoneclient \

# tox -e py27 test_keystoneclient_sql.KcOptTestCase

**** CubicPower OpenStack Study ****

    def setUp(self):

        self.checkout_info = os.environ.get('KSCTEST_PATH')

        if not self.checkout_info:

            self.skip('Set KSCTEST_PATH env to test with local client')

        super(KcOptTestCase, self).setUp()