¡@

Home 

OpenStack Study: test_backend.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2012 OpenStack Foundation

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

import copy

import datetime

import hashlib

import mock

import uuid

import six

from testtools import matchers

from keystone.catalog import core

from keystone.common import driver_hints

from keystone import config

from keystone import exception

from keystone.openstack.common import timeutils

from keystone import tests

from keystone.tests import default_fixtures

from keystone.tests import filtering

from keystone.tests import test_utils

from keystone.token import provider

CONF = config.CONF

DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id

TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'

NULL_OBJECT = object()

**** CubicPower OpenStack Study ****

class IdentityTests(object):

**** CubicPower OpenStack Study ****

    def _get_domain_fixture(self):

        domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain['id'], domain)

        return domain

**** CubicPower OpenStack Study ****

    def test_project_add_and_remove_user_role(self):

        user_ids = self.assignment_api.list_user_ids_for_project(

            self.tenant_bar['id'])

        self.assertNotIn(self.user_two['id'], user_ids)

        self.assignment_api.add_role_to_user_and_project(

            tenant_id=self.tenant_bar['id'],

            user_id=self.user_two['id'],

            role_id=self.role_other['id'])

        user_ids = self.assignment_api.list_user_ids_for_project(

            self.tenant_bar['id'])

        self.assertIn(self.user_two['id'], user_ids)

        self.assignment_api.remove_role_from_user_and_project(

            tenant_id=self.tenant_bar['id'],

            user_id=self.user_two['id'],

            role_id=self.role_other['id'])

        user_ids = self.assignment_api.list_user_ids_for_project(

            self.tenant_bar['id'])

        self.assertNotIn(self.user_two['id'], user_ids)

**** CubicPower OpenStack Study ****

    def test_remove_user_role_not_assigned(self):

        # Expect failure if attempt to remove a role that was never assigned to

        # the user.

        self.assertRaises(exception.RoleNotFound,

                          self.assignment_api.

                          remove_role_from_user_and_project,

                          tenant_id=self.tenant_bar['id'],

                          user_id=self.user_two['id'],

                          role_id=self.role_other['id'])

**** CubicPower OpenStack Study ****

    def test_authenticate_bad_user(self):

        self.assertRaises(AssertionError,

                          self.identity_api.authenticate,

                          context={},

                          user_id=uuid.uuid4().hex,

                          password=self.user_foo['password'])

**** CubicPower OpenStack Study ****

    def test_authenticate_bad_password(self):

        self.assertRaises(AssertionError,

                          self.identity_api.authenticate,

                          context={},

                          user_id=self.user_foo['id'],

                          password=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_authenticate(self):

        user_ref = self.identity_api.authenticate(

            context={},

            user_id=self.user_sna['id'],

            password=self.user_sna['password'])

        # NOTE(termie): the password field is left in user_sna to make

        #               it easier to authenticate in tests, but should

        #               not be returned by the api

        self.user_sna.pop('password')

        self.user_sna['enabled'] = True

        self.assertDictEqual(user_ref, self.user_sna)

**** CubicPower OpenStack Study ****

    def test_authenticate_and_get_roles_no_metadata(self):

        user = {

            'id': 'no_meta',

            'name': 'NO_META',

            'domain_id': DEFAULT_DOMAIN_ID,

            'password': 'no_meta2',

        }

        self.identity_api.create_user(user['id'], user)

        self.assignment_api.add_user_to_project(self.tenant_baz['id'],

                                                user['id'])

        user_ref = self.identity_api.authenticate(

            context={},

            user_id=user['id'],

            password=user['password'])

        self.assertNotIn('password', user_ref)

        # NOTE(termie): the password field is left in user_sna to make

        #               it easier to authenticate in tests, but should

        #               not be returned by the api

        user.pop('password')

        self.assertDictContainsSubset(user, user_ref)

        role_list = self.assignment_api.get_roles_for_user_and_project(

            user['id'], self.tenant_baz['id'])

        self.assertEqual(len(role_list), 1)

        self.assertIn(CONF.member_role_id, role_list)

**** CubicPower OpenStack Study ****

    def test_authenticate_if_no_password_set(self):

        id_ = uuid.uuid4().hex

        user = {

            'id': id_,

            'name': uuid.uuid4().hex,

            'domain_id': DEFAULT_DOMAIN_ID,

        }

        self.identity_api.create_user(user['id'], user)

        self.assertRaises(AssertionError,

                          self.identity_api.authenticate,

                          context={},

                          user_id=id_,

                          password='password')

**** CubicPower OpenStack Study ****

    def test_password_hashed(self):

        driver = self.identity_api._select_identity_driver(

            self.user_foo['domain_id'])

        user_ref = driver._get_user(self.user_foo['id'])

        self.assertNotEqual(user_ref['password'], self.user_foo['password'])

**** CubicPower OpenStack Study ****

    def test_create_unicode_user_name(self):

        unicode_name = u'name \u540d\u5b57'

        user = {'id': uuid.uuid4().hex,

                'name': unicode_name,

                'domain_id': DEFAULT_DOMAIN_ID,

                'password': uuid.uuid4().hex}

        ref = self.identity_api.create_user(user['id'], user)

        self.assertEqual(unicode_name, ref['name'])

**** CubicPower OpenStack Study ****

    def test_get_project(self):

        tenant_ref = self.assignment_api.get_project(self.tenant_bar['id'])

        self.assertDictEqual(tenant_ref, self.tenant_bar)

**** CubicPower OpenStack Study ****

    def test_get_project_404(self):

        self.assertRaises(exception.ProjectNotFound,

                          self.assignment_api.get_project,

                          uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_get_project_by_name(self):

        tenant_ref = self.assignment_api.get_project_by_name(

            self.tenant_bar['name'],

            DEFAULT_DOMAIN_ID)

        self.assertDictEqual(tenant_ref, self.tenant_bar)

**** CubicPower OpenStack Study ****

    def test_get_project_by_name_404(self):

        self.assertRaises(exception.ProjectNotFound,

                          self.assignment_api.get_project_by_name,

                          uuid.uuid4().hex,

                          DEFAULT_DOMAIN_ID)

**** CubicPower OpenStack Study ****

    def test_list_user_ids_for_project(self):

        user_ids = self.assignment_api.list_user_ids_for_project(

            self.tenant_baz['id'])

        self.assertEqual(len(user_ids), 2)

        self.assertIn(self.user_two['id'], user_ids)

        self.assertIn(self.user_badguy['id'], user_ids)

**** CubicPower OpenStack Study ****

    def test_get_project_user_ids_404(self):

        self.assertRaises(exception.ProjectNotFound,

                          self.assignment_api.list_user_ids_for_project,

                          uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_get_user(self):

        user_ref = self.identity_api.get_user(self.user_foo['id'])

        # NOTE(termie): the password field is left in user_foo to make

        #               it easier to authenticate in tests, but should

        #               not be returned by the api

        self.user_foo.pop('password')

        self.assertDictEqual(user_ref, self.user_foo)

**** CubicPower OpenStack Study ****

    def test_get_user_404(self):

        self.assertRaises(exception.UserNotFound,

                          self.identity_api.get_user,

                          uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_get_user_by_name(self):

        user_ref = self.identity_api.get_user_by_name(

            self.user_foo['name'], DEFAULT_DOMAIN_ID)

        # NOTE(termie): the password field is left in user_foo to make

        #               it easier to authenticate in tests, but should

        #               not be returned by the api

        self.user_foo.pop('password')

        self.assertDictEqual(user_ref, self.user_foo)

**** CubicPower OpenStack Study ****

    def test_get_user_by_name_404(self):

        self.assertRaises(exception.UserNotFound,

                          self.identity_api.get_user_by_name,

                          uuid.uuid4().hex,

                          DEFAULT_DOMAIN_ID)

**** CubicPower OpenStack Study ****

    def test_get_role(self):

        role_ref = self.assignment_api.get_role(self.role_admin['id'])

        role_ref_dict = dict((x, role_ref[x]) for x in role_ref)

        self.assertDictEqual(role_ref_dict, self.role_admin)

**** CubicPower OpenStack Study ****

    def test_get_role_404(self):

        self.assertRaises(exception.RoleNotFound,

                          self.assignment_api.get_role,

                          uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_create_duplicate_role_name_fails(self):

        role = {'id': 'fake1',

                'name': 'fake1name'}

        self.assignment_api.create_role('fake1', role)

        role['id'] = 'fake2'

        self.assertRaises(exception.Conflict,

                          self.assignment_api.create_role,

                          'fake2',

                          role)

**** CubicPower OpenStack Study ****

    def test_rename_duplicate_role_name_fails(self):

        role1 = {

            'id': 'fake1',

            'name': 'fake1name'

        }

        role2 = {

            'id': 'fake2',

            'name': 'fake2name'

        }

        self.assignment_api.create_role('fake1', role1)

        self.assignment_api.create_role('fake2', role2)

        role1['name'] = 'fake2name'

        self.assertRaises(exception.Conflict,

                          self.assignment_api.update_role,

                          'fake1',

                          role1)

**** CubicPower OpenStack Study ****

    def test_create_duplicate_user_id_fails(self):

        user = {'id': 'fake1',

                'name': 'fake1',

                'domain_id': DEFAULT_DOMAIN_ID,

                'password': 'fakepass',

                'tenants': ['bar']}

        self.identity_api.create_user('fake1', user)

        user['name'] = 'fake2'

        self.assertRaises(exception.Conflict,

                          self.identity_api.create_user,

                          'fake1',

                          user)

**** CubicPower OpenStack Study ****

    def test_create_duplicate_user_name_fails(self):

        user = {'id': 'fake1',

                'name': 'fake1',

                'domain_id': DEFAULT_DOMAIN_ID,

                'password': 'fakepass',

                'tenants': ['bar']}

        self.identity_api.create_user('fake1', user)

        user['id'] = 'fake2'

        self.assertRaises(exception.Conflict,

                          self.identity_api.create_user,

                          'fake2',

                          user)

**** CubicPower OpenStack Study ****

    def test_create_duplicate_user_name_in_different_domains(self):

        new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(new_domain['id'], new_domain)

        user1 = {'id': uuid.uuid4().hex,

                 'name': uuid.uuid4().hex,

                 'domain_id': DEFAULT_DOMAIN_ID,

                 'password': uuid.uuid4().hex}

        user2 = {'id': uuid.uuid4().hex,

                 'name': user1['name'],

                 'domain_id': new_domain['id'],

                 'password': uuid.uuid4().hex}

        self.identity_api.create_user(user1['id'], user1)

        self.identity_api.create_user(user2['id'], user2)

**** CubicPower OpenStack Study ****

    def test_move_user_between_domains(self):

        domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain1['id'], domain1)

        domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain2['id'], domain2)

        user = {'id': uuid.uuid4().hex,

                'name': uuid.uuid4().hex,

                'domain_id': domain1['id'],

                'password': uuid.uuid4().hex}

        self.identity_api.create_user(user['id'], user)

        user['domain_id'] = domain2['id']

        self.identity_api.update_user(user['id'], user)

**** CubicPower OpenStack Study ****

    def test_move_user_between_domains_with_clashing_names_fails(self):

        domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain1['id'], domain1)

        domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain2['id'], domain2)

        # First, create a user in domain1

        user1 = {'id': uuid.uuid4().hex,

                 'name': uuid.uuid4().hex,

                 'domain_id': domain1['id'],

                 'password': uuid.uuid4().hex}

        self.identity_api.create_user(user1['id'], user1)

        # Now create a user in domain2 with a potentially clashing

        # name - which should work since we have domain separation

        user2 = {'id': uuid.uuid4().hex,

                 'name': user1['name'],

                 'domain_id': domain2['id'],

                 'password': uuid.uuid4().hex}

        self.identity_api.create_user(user2['id'], user2)

        # Now try and move user1 into the 2nd domain - which should

        # fail since the names clash

        user1['domain_id'] = domain2['id']

        self.assertRaises(exception.Conflict,

                          self.identity_api.update_user,

                          user1['id'],

                          user1)

**** CubicPower OpenStack Study ****

    def test_rename_duplicate_user_name_fails(self):

        user1 = {'id': 'fake1',

                 'name': 'fake1',

                 'domain_id': DEFAULT_DOMAIN_ID,

                 'password': 'fakepass',

                 'tenants': ['bar']}

        user2 = {'id': 'fake2',

                 'name': 'fake2',

                 'domain_id': DEFAULT_DOMAIN_ID,

                 'password': 'fakepass',

                 'tenants': ['bar']}

        self.identity_api.create_user('fake1', user1)

        self.identity_api.create_user('fake2', user2)

        user2['name'] = 'fake1'

        self.assertRaises(exception.Conflict,

                          self.identity_api.update_user,

                          'fake2',

                          user2)

**** CubicPower OpenStack Study ****

    def test_update_user_id_fails(self):

        user = {'id': 'fake1',

                'name': 'fake1',

                'domain_id': DEFAULT_DOMAIN_ID,

                'password': 'fakepass',

                'tenants': ['bar']}

        self.identity_api.create_user('fake1', user)

        user['id'] = 'fake2'

        self.assertRaises(exception.ValidationError,

                          self.identity_api.update_user,

                          'fake1',

                          user)

        user_ref = self.identity_api.get_user('fake1')

        self.assertEqual(user_ref['id'], 'fake1')

        self.assertRaises(exception.UserNotFound,

                          self.identity_api.get_user,

                          'fake2')

**** CubicPower OpenStack Study ****

    def test_create_duplicate_project_id_fails(self):

        tenant = {'id': 'fake1', 'name': 'fake1',

                  'domain_id': DEFAULT_DOMAIN_ID}

        self.assignment_api.create_project('fake1', tenant)

        tenant['name'] = 'fake2'

        self.assertRaises(exception.Conflict,

                          self.assignment_api.create_project,

                          'fake1',

                          tenant)

**** CubicPower OpenStack Study ****

    def test_create_duplicate_project_name_fails(self):

        tenant = {'id': 'fake1', 'name': 'fake',

                  'domain_id': DEFAULT_DOMAIN_ID}

        self.assignment_api.create_project('fake1', tenant)

        tenant['id'] = 'fake2'

        self.assertRaises(exception.Conflict,

                          self.assignment_api.create_project,

                          'fake1',

                          tenant)

**** CubicPower OpenStack Study ****

    def test_create_duplicate_project_name_in_different_domains(self):

        new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(new_domain['id'], new_domain)

        tenant1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                   'domain_id': DEFAULT_DOMAIN_ID}

        tenant2 = {'id': uuid.uuid4().hex, 'name': tenant1['name'],

                   'domain_id': new_domain['id']}

        self.assignment_api.create_project(tenant1['id'], tenant1)

        self.assignment_api.create_project(tenant2['id'], tenant2)

**** CubicPower OpenStack Study ****

    def test_move_project_between_domains(self):

        domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain1['id'], domain1)

        domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain2['id'], domain2)

        project = {'id': uuid.uuid4().hex,

                   'name': uuid.uuid4().hex,

                   'domain_id': domain1['id']}

        self.assignment_api.create_project(project['id'], project)

        project['domain_id'] = domain2['id']

        self.assignment_api.update_project(project['id'], project)

**** CubicPower OpenStack Study ****

    def test_move_project_between_domains_with_clashing_names_fails(self):

        domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain1['id'], domain1)

        domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain2['id'], domain2)

        # First, create a project in domain1

        project1 = {'id': uuid.uuid4().hex,

                    'name': uuid.uuid4().hex,

                    'domain_id': domain1['id']}

        self.assignment_api.create_project(project1['id'], project1)

        # Now create a project in domain2 with a potentially clashing

        # name - which should work since we have domain separation

        project2 = {'id': uuid.uuid4().hex,

                    'name': project1['name'],

                    'domain_id': domain2['id']}

        self.assignment_api.create_project(project2['id'], project2)

        # Now try and move project1 into the 2nd domain - which should

        # fail since the names clash

        project1['domain_id'] = domain2['id']

        self.assertRaises(exception.Conflict,

                          self.assignment_api.update_project,

                          project1['id'],

                          project1)

**** CubicPower OpenStack Study ****

    def test_rename_duplicate_project_name_fails(self):

        tenant1 = {'id': 'fake1', 'name': 'fake1',

                   'domain_id': DEFAULT_DOMAIN_ID}

        tenant2 = {'id': 'fake2', 'name': 'fake2',

                   'domain_id': DEFAULT_DOMAIN_ID}

        self.assignment_api.create_project('fake1', tenant1)

        self.assignment_api.create_project('fake2', tenant2)

        tenant2['name'] = 'fake1'

        self.assertRaises(exception.Error,

                          self.assignment_api.update_project,

                          'fake2',

                          tenant2)

**** CubicPower OpenStack Study ****

    def test_update_project_id_does_nothing(self):

        tenant = {'id': 'fake1', 'name': 'fake1',

                  'domain_id': DEFAULT_DOMAIN_ID}

        self.assignment_api.create_project('fake1', tenant)

        tenant['id'] = 'fake2'

        self.assignment_api.update_project('fake1', tenant)

        tenant_ref = self.assignment_api.get_project('fake1')

        self.assertEqual(tenant_ref['id'], 'fake1')

        self.assertRaises(exception.ProjectNotFound,

                          self.assignment_api.get_project,

                          'fake2')

**** CubicPower OpenStack Study ****

    def test_list_role_assignments_unfiltered(self):

        """Test for unfiltered listing role assignments.

        Test Plan:

        - Create a domain, with a user, group & project

        - Find how many role assignments already exist (from default

          fixtures)

        - Create a grant of each type (user/group on project/domain)

        - Check the number of assignments has gone up by 4 and that

          the entries we added are in the list returned

        - Check that if we list assignments by role_id, then we get back

          assignments that only contain that role.

        """

        new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(new_domain['id'], new_domain)

        new_user = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                    'password': uuid.uuid4().hex, 'enabled': True,

                    'domain_id': new_domain['id']}

        self.identity_api.create_user(new_user['id'],

                                      new_user)

        new_group = {'id': uuid.uuid4().hex, 'domain_id': new_domain['id'],

                     'name': uuid.uuid4().hex}

        self.identity_api.create_group(new_group['id'], new_group)

        new_project = {'id': uuid.uuid4().hex,

                       'name': uuid.uuid4().hex,

                       'domain_id': new_domain['id']}

        self.assignment_api.create_project(new_project['id'], new_project)

        # First check how many role grants already exist

        existing_assignments = len(self.assignment_api.list_role_assignments())

        existing_assignments_for_role = len(

            self.assignment_api.list_role_assignments_for_role(

                role_id='admin'))

        # Now create the grants (roles are defined in default_fixtures)

        self.assignment_api.create_grant(user_id=new_user['id'],

                                         domain_id=new_domain['id'],

                                         role_id='member')

        self.assignment_api.create_grant(user_id=new_user['id'],

                                         project_id=new_project['id'],

                                         role_id='other')

        self.assignment_api.create_grant(group_id=new_group['id'],

                                         domain_id=new_domain['id'],

                                         role_id='admin')

        self.assignment_api.create_grant(group_id=new_group['id'],

                                         project_id=new_project['id'],

                                         role_id='admin')

        # Read back the full list of assignments - check it is gone up by 4

        assignment_list = self.assignment_api.list_role_assignments()

        self.assertEqual(len(assignment_list), existing_assignments + 4)

        # Now check that each of our four new entries are in the list

        self.assertIn(

            {'user_id': new_user['id'], 'domain_id': new_domain['id'],

             'role_id': 'member'},

            assignment_list)

        self.assertIn(

            {'user_id': new_user['id'], 'project_id': new_project['id'],

             'role_id': 'other'},

            assignment_list)

        self.assertIn(

            {'group_id': new_group['id'], 'domain_id': new_domain['id'],

             'role_id': 'admin'},

            assignment_list)

        self.assertIn(

            {'group_id': new_group['id'], 'project_id': new_project['id'],

             'role_id': 'admin'},

            assignment_list)

        # Read back the list of assignments for just the admin role, checking

        # this only goes up by two.

        assignment_list = self.assignment_api.list_role_assignments_for_role(

            role_id='admin')

        self.assertEqual(len(assignment_list),

                         existing_assignments_for_role + 2)

        # Now check that each of our two new entries are in the list

        self.assertIn(

            {'group_id': new_group['id'], 'domain_id': new_domain['id'],

             'role_id': 'admin'},

            assignment_list)

        self.assertIn(

            {'group_id': new_group['id'], 'project_id': new_project['id'],

             'role_id': 'admin'},

            assignment_list)

**** CubicPower OpenStack Study ****

    def test_list_role_assignments_bad_role(self):

        assignment_list = self.assignment_api.list_role_assignments_for_role(

            role_id=uuid.uuid4().hex)

        self.assertEqual(assignment_list, [])

**** CubicPower OpenStack Study ****

    def test_add_duplicate_role_grant(self):

        roles_ref = self.assignment_api.get_roles_for_user_and_project(

            self.user_foo['id'], self.tenant_bar['id'])

        self.assertNotIn(self.role_admin['id'], roles_ref)

        self.assignment_api.add_role_to_user_and_project(

            self.user_foo['id'], self.tenant_bar['id'], self.role_admin['id'])

        self.assertRaises(exception.Conflict,

                          self.assignment_api.add_role_to_user_and_project,

                          self.user_foo['id'],

                          self.tenant_bar['id'],

                          self.role_admin['id'])

**** CubicPower OpenStack Study ****

    def test_get_role_by_user_and_project_with_user_in_group(self):

        """Test for get role by user and project, user was added into a group.

        Test Plan:

        - Create a user, a project & a group, add this user to group

        - Create roles and grant them to user and project

        - Check the role list get by the user and project was as expected

        """

        user_ref = {'id': uuid.uuid4().hex,

                    'name': uuid.uuid4().hex,

                    'domain_id': DEFAULT_DOMAIN_ID,

                    'password': uuid.uuid4().hex,

                    'enabled': True}

        self.identity_api.create_user(user_ref['id'], user_ref)

        project_ref = {'id': uuid.uuid4().hex,

                       'name': uuid.uuid4().hex,

                       'domain_id': DEFAULT_DOMAIN_ID}

        self.assignment_api.create_project(project_ref['id'], project_ref)

        group = {'id': uuid.uuid4().hex,

                 'name': uuid.uuid4().hex,

                 'domain_id': DEFAULT_DOMAIN_ID}

        group_id = self.identity_api.create_group(group['id'], group)['id']

        self.identity_api.add_user_to_group(user_ref['id'], group_id)

        role_ref_list = []

        for i in range(2):

            role_ref = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

            self.assignment_api.create_role(role_ref['id'], role_ref)

            role_ref_list.append(role_ref)

            self.assignment_api.add_role_to_user_and_project(

                user_id=user_ref['id'],

                tenant_id=project_ref['id'],

                role_id=role_ref['id'])

        role_list = self.assignment_api.get_roles_for_user_and_project(

            user_id=user_ref['id'],

            tenant_id=project_ref['id'])

        self.assertEqual(set(role_list),

                         set([role_ref['id'] for role_ref in role_ref_list]))

**** CubicPower OpenStack Study ****

    def test_get_role_by_user_and_project(self):

        roles_ref = self.assignment_api.get_roles_for_user_and_project(

            self.user_foo['id'], self.tenant_bar['id'])

        self.assertNotIn(self.role_admin['id'], roles_ref)

        self.assignment_api.add_role_to_user_and_project(

            self.user_foo['id'], self.tenant_bar['id'], self.role_admin['id'])

        roles_ref = self.assignment_api.get_roles_for_user_and_project(

            self.user_foo['id'], self.tenant_bar['id'])

        self.assertIn(self.role_admin['id'], roles_ref)

        self.assertNotIn('member', roles_ref)

        self.assignment_api.add_role_to_user_and_project(

            self.user_foo['id'], self.tenant_bar['id'], 'member')

        roles_ref = self.assignment_api.get_roles_for_user_and_project(

            self.user_foo['id'], self.tenant_bar['id'])

        self.assertIn(self.role_admin['id'], roles_ref)

        self.assertIn('member', roles_ref)

**** CubicPower OpenStack Study ****

    def test_get_roles_for_user_and_domain(self):

        """Test for getting roles for user on a domain.

        Test Plan:

        - Create a domain, with 2 users

        - Check no roles yet exit

        - Give user1 two roles on the domain, user2 one role

        - Get roles on user1 and the domain - maybe sure we only

          get back the 2 roles on user1

        - Delete both roles from user1

        - Check we get no roles back for user1 on domain

        """

        new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(new_domain['id'], new_domain)

        new_user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                     'password': uuid.uuid4().hex, 'enabled': True,

                     'domain_id': new_domain['id']}

        self.identity_api.create_user(new_user1['id'], new_user1)

        new_user2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                     'password': uuid.uuid4().hex, 'enabled': True,

                     'domain_id': new_domain['id']}

        self.identity_api.create_user(new_user2['id'], new_user2)

        roles_ref = self.assignment_api.list_grants(

            user_id=new_user1['id'],

            domain_id=new_domain['id'])

        self.assertEqual(len(roles_ref), 0)

        # Now create the grants (roles are defined in default_fixtures)

        self.assignment_api.create_grant(user_id=new_user1['id'],

                                         domain_id=new_domain['id'],

                                         role_id='member')

        self.assignment_api.create_grant(user_id=new_user1['id'],

                                         domain_id=new_domain['id'],

                                         role_id='other')

        self.assignment_api.create_grant(user_id=new_user2['id'],

                                         domain_id=new_domain['id'],

                                         role_id='admin')

        # Read back the roles for user1 on domain

        roles_ids = self.assignment_api.get_roles_for_user_and_domain(

            new_user1['id'], new_domain['id'])

        self.assertEqual(len(roles_ids), 2)

        self.assertIn(self.role_member['id'], roles_ids)

        self.assertIn(self.role_other['id'], roles_ids)

        # Now delete both grants for user1

        self.assignment_api.delete_grant(user_id=new_user1['id'],

                                         domain_id=new_domain['id'],

                                         role_id='member')

        self.assignment_api.delete_grant(user_id=new_user1['id'],

                                         domain_id=new_domain['id'],

                                         role_id='other')

        roles_ref = self.assignment_api.list_grants(

            user_id=new_user1['id'],

            domain_id=new_domain['id'])

        self.assertEqual(len(roles_ref), 0)

**** CubicPower OpenStack Study ****

    def test_get_roles_for_user_and_domain_404(self):

        """Test errors raised when getting roles for user on a domain.

        Test Plan:

        - Check non-existing user gives UserNotFound

        - Check non-existing domain gives DomainNotFound

        """

        new_domain = self._get_domain_fixture()

        new_user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                     'password': uuid.uuid4().hex, 'enabled': True,

                     'domain_id': new_domain['id']}

        self.identity_api.create_user(new_user1['id'], new_user1)

        self.assertRaises(exception.UserNotFound,

                          self.assignment_api.get_roles_for_user_and_domain,

                          uuid.uuid4().hex,

                          new_domain['id'])

        self.assertRaises(exception.DomainNotFound,

                          self.assignment_api.get_roles_for_user_and_domain,

                          new_user1['id'],

                          uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_get_roles_for_user_and_project_404(self):

        self.assertRaises(exception.UserNotFound,

                          self.assignment_api.get_roles_for_user_and_project,

                          uuid.uuid4().hex,

                          self.tenant_bar['id'])

        self.assertRaises(exception.ProjectNotFound,

                          self.assignment_api.get_roles_for_user_and_project,

                          self.user_foo['id'],

                          uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_add_role_to_user_and_project_404(self):

        self.assertRaises(exception.ProjectNotFound,

                          self.assignment_api.add_role_to_user_and_project,

                          self.user_foo['id'],

                          uuid.uuid4().hex,

                          self.role_admin['id'])

        self.assertRaises(exception.RoleNotFound,

                          self.assignment_api.add_role_to_user_and_project,

                          self.user_foo['id'],

                          self.tenant_bar['id'],

                          uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_add_role_to_user_and_project_no_user(self):

        # If add_role_to_user_and_project and the user doesn't exist, then

        # no error.

        user_id_not_exist = uuid.uuid4().hex

        self.assignment_api.add_role_to_user_and_project(

            user_id_not_exist, self.tenant_bar['id'], self.role_admin['id'])

**** CubicPower OpenStack Study ****

    def test_remove_role_from_user_and_project(self):

        self.assignment_api.add_role_to_user_and_project(

            self.user_foo['id'], self.tenant_bar['id'], 'member')

        self.assignment_api.remove_role_from_user_and_project(

            self.user_foo['id'], self.tenant_bar['id'], 'member')

        roles_ref = self.assignment_api.get_roles_for_user_and_project(

            self.user_foo['id'], self.tenant_bar['id'])

        self.assertNotIn('member', roles_ref)

        self.assertRaises(exception.NotFound,

                          self.assignment_api.

                          remove_role_from_user_and_project,

                          self.user_foo['id'],

                          self.tenant_bar['id'],

                          'member')

**** CubicPower OpenStack Study ****

    def test_get_role_grant_by_user_and_project(self):

        roles_ref = self.assignment_api.list_grants(

            user_id=self.user_foo['id'],

            project_id=self.tenant_bar['id'])

        self.assertEqual(len(roles_ref), 1)

        self.assignment_api.create_grant(user_id=self.user_foo['id'],

                                         project_id=self.tenant_bar['id'],

                                         role_id=self.role_admin['id'])

        roles_ref = self.assignment_api.list_grants(

            user_id=self.user_foo['id'],

            project_id=self.tenant_bar['id'])

        self.assertIn(self.role_admin['id'],

                      [role_ref['id'] for role_ref in roles_ref])

        self.assignment_api.create_grant(user_id=self.user_foo['id'],

                                         project_id=self.tenant_bar['id'],

                                         role_id='member')

        roles_ref = self.assignment_api.list_grants(

            user_id=self.user_foo['id'],

            project_id=self.tenant_bar['id'])

        roles_ref_ids = []

        for ref in roles_ref:

            roles_ref_ids.append(ref['id'])

        self.assertIn(self.role_admin['id'], roles_ref_ids)

        self.assertIn('member', roles_ref_ids)

**** CubicPower OpenStack Study ****

    def test_remove_role_grant_from_user_and_project(self):

        self.assignment_api.create_grant(user_id=self.user_foo['id'],

                                         project_id=self.tenant_baz['id'],

                                         role_id='member')

        roles_ref = self.assignment_api.list_grants(

            user_id=self.user_foo['id'],

            project_id=self.tenant_baz['id'])

        self.assertDictEqual(roles_ref[0], self.role_member)

        self.assignment_api.delete_grant(user_id=self.user_foo['id'],

                                         project_id=self.tenant_baz['id'],

                                         role_id='member')

        roles_ref = self.assignment_api.list_grants(

            user_id=self.user_foo['id'],

            project_id=self.tenant_baz['id'])

        self.assertEqual(len(roles_ref), 0)

        self.assertRaises(exception.NotFound,

                          self.assignment_api.delete_grant,

                          user_id=self.user_foo['id'],

                          project_id=self.tenant_baz['id'],

                          role_id='member')

**** CubicPower OpenStack Study ****

    def test_get_and_remove_role_grant_by_group_and_project(self):

        new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(new_domain['id'], new_domain)

        new_group = {'id': uuid.uuid4().hex, 'domain_id': new_domain['id'],

                     'name': uuid.uuid4().hex}

        self.identity_api.create_group(new_group['id'], new_group)

        new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',

                    'password': 'secret', 'enabled': True,

                    'domain_id': new_domain['id']}

        self.identity_api.create_user(new_user['id'], new_user)

        self.identity_api.add_user_to_group(new_user['id'],

                                            new_group['id'])

        roles_ref = self.assignment_api.list_grants(

            group_id=new_group['id'],

            project_id=self.tenant_bar['id'])

        self.assertEqual(len(roles_ref), 0)

        self.assignment_api.create_grant(group_id=new_group['id'],

                                         project_id=self.tenant_bar['id'],

                                         role_id='member')

        roles_ref = self.assignment_api.list_grants(

            group_id=new_group['id'],

            project_id=self.tenant_bar['id'])

        self.assertDictEqual(roles_ref[0], self.role_member)

        self.assignment_api.delete_grant(group_id=new_group['id'],

                                         project_id=self.tenant_bar['id'],

                                         role_id='member')

        roles_ref = self.assignment_api.list_grants(

            group_id=new_group['id'],

            project_id=self.tenant_bar['id'])

        self.assertEqual(len(roles_ref), 0)

        self.assertRaises(exception.NotFound,

                          self.assignment_api.delete_grant,

                          group_id=new_group['id'],

                          project_id=self.tenant_bar['id'],

                          role_id='member')

**** CubicPower OpenStack Study ****

    def test_get_and_remove_role_grant_by_group_and_domain(self):

        new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(new_domain['id'], new_domain)

        new_group = {'id': uuid.uuid4().hex, 'domain_id': new_domain['id'],

                     'name': uuid.uuid4().hex}

        self.identity_api.create_group(new_group['id'], new_group)

        new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',

                    'password': uuid.uuid4().hex, 'enabled': True,

                    'domain_id': new_domain['id']}

        self.identity_api.create_user(new_user['id'], new_user)

        self.identity_api.add_user_to_group(new_user['id'],

                                            new_group['id'])

        roles_ref = self.assignment_api.list_grants(

            group_id=new_group['id'],

            domain_id=new_domain['id'])

        self.assertEqual(len(roles_ref), 0)

        self.assignment_api.create_grant(group_id=new_group['id'],

                                         domain_id=new_domain['id'],

                                         role_id='member')

        roles_ref = self.assignment_api.list_grants(

            group_id=new_group['id'],

            domain_id=new_domain['id'])

        self.assertDictEqual(roles_ref[0], self.role_member)

        self.assignment_api.delete_grant(group_id=new_group['id'],

                                         domain_id=new_domain['id'],

                                         role_id='member')

        roles_ref = self.assignment_api.list_grants(

            group_id=new_group['id'],

            domain_id=new_domain['id'])

        self.assertEqual(len(roles_ref), 0)

        self.assertRaises(exception.NotFound,

                          self.assignment_api.delete_grant,

                          group_id=new_group['id'],

                          domain_id=new_domain['id'],

                          role_id='member')

**** CubicPower OpenStack Study ****

    def test_get_and_remove_correct_role_grant_from_a_mix(self):

        new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(new_domain['id'], new_domain)

        new_project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                       'domain_id': new_domain['id']}

        self.assignment_api.create_project(new_project['id'], new_project)

        new_group = {'id': uuid.uuid4().hex, 'domain_id': new_domain['id'],

                     'name': uuid.uuid4().hex}

        self.identity_api.create_group(new_group['id'], new_group)

        new_group2 = {'id': uuid.uuid4().hex, 'domain_id': new_domain['id'],

                      'name': uuid.uuid4().hex}

        self.identity_api.create_group(new_group2['id'], new_group2)

        new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',

                    'password': uuid.uuid4().hex, 'enabled': True,

                    'domain_id': new_domain['id']}

        self.identity_api.create_user(new_user['id'], new_user)

        new_user2 = {'id': uuid.uuid4().hex, 'name': 'new_user2',

                     'password': uuid.uuid4().hex, 'enabled': True,

                     'domain_id': new_domain['id']}

        self.identity_api.create_user(new_user2['id'], new_user2)

        self.identity_api.add_user_to_group(new_user['id'],

                                            new_group['id'])

        # First check we have no grants

        roles_ref = self.assignment_api.list_grants(

            group_id=new_group['id'],

            domain_id=new_domain['id'])

        self.assertEqual(len(roles_ref), 0)

        # Now add the grant we are going to test for, and some others as

        # well just to make sure we get back the right one

        self.assignment_api.create_grant(group_id=new_group['id'],

                                         domain_id=new_domain['id'],

                                         role_id='member')

        self.assignment_api.create_grant(group_id=new_group2['id'],

                                         domain_id=new_domain['id'],

                                         role_id=self.role_admin['id'])

        self.assignment_api.create_grant(user_id=new_user2['id'],

                                         domain_id=new_domain['id'],

                                         role_id=self.role_admin['id'])

        self.assignment_api.create_grant(group_id=new_group['id'],

                                         project_id=new_project['id'],

                                         role_id=self.role_admin['id'])

        roles_ref = self.assignment_api.list_grants(

            group_id=new_group['id'],

            domain_id=new_domain['id'])

        self.assertDictEqual(roles_ref[0], self.role_member)

        self.assignment_api.delete_grant(group_id=new_group['id'],

                                         domain_id=new_domain['id'],

                                         role_id='member')

        roles_ref = self.assignment_api.list_grants(

            group_id=new_group['id'],

            domain_id=new_domain['id'])

        self.assertEqual(len(roles_ref), 0)

        self.assertRaises(exception.NotFound,

                          self.assignment_api.delete_grant,

                          group_id=new_group['id'],

                          domain_id=new_domain['id'],

                          role_id='member')

**** CubicPower OpenStack Study ****

    def test_get_and_remove_role_grant_by_user_and_domain(self):

        new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(new_domain['id'], new_domain)

        new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',

                    'password': 'secret', 'enabled': True,

                    'domain_id': new_domain['id']}

        self.identity_api.create_user(new_user['id'], new_user)

        roles_ref = self.assignment_api.list_grants(

            user_id=new_user['id'],

            domain_id=new_domain['id'])

        self.assertEqual(len(roles_ref), 0)

        self.assignment_api.create_grant(user_id=new_user['id'],

                                         domain_id=new_domain['id'],

                                         role_id='member')

        roles_ref = self.assignment_api.list_grants(

            user_id=new_user['id'],

            domain_id=new_domain['id'])

        self.assertDictEqual(roles_ref[0], self.role_member)

        self.assignment_api.delete_grant(user_id=new_user['id'],

                                         domain_id=new_domain['id'],

                                         role_id='member')

        roles_ref = self.assignment_api.list_grants(

            user_id=new_user['id'],

            domain_id=new_domain['id'])

        self.assertEqual(len(roles_ref), 0)

        self.assertRaises(exception.NotFound,

                          self.assignment_api.delete_grant,

                          user_id=new_user['id'],

                          domain_id=new_domain['id'],

                          role_id='member')

**** CubicPower OpenStack Study ****

    def test_get_and_remove_role_grant_by_group_and_cross_domain(self):

        group1_domain1_role = {'id': uuid.uuid4().hex,

                               'name': uuid.uuid4().hex}

        self.assignment_api.create_role(group1_domain1_role['id'],

                                        group1_domain1_role)

        group1_domain2_role = {'id': uuid.uuid4().hex,

                               'name': uuid.uuid4().hex}

        self.assignment_api.create_role(group1_domain2_role['id'],

                                        group1_domain2_role)

        domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain1['id'], domain1)

        domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain2['id'], domain2)

        group1 = {'id': uuid.uuid4().hex, 'domain_id': domain1['id'],

                  'name': uuid.uuid4().hex}

        self.identity_api.create_group(group1['id'], group1)

        roles_ref = self.assignment_api.list_grants(

            group_id=group1['id'],

            domain_id=domain1['id'])

        self.assertEqual(len(roles_ref), 0)

        roles_ref = self.assignment_api.list_grants(

            group_id=group1['id'],

            domain_id=domain2['id'])

        self.assertEqual(len(roles_ref), 0)

        self.assignment_api.create_grant(group_id=group1['id'],

                                         domain_id=domain1['id'],

                                         role_id=group1_domain1_role['id'])

        self.assignment_api.create_grant(group_id=group1['id'],

                                         domain_id=domain2['id'],

                                         role_id=group1_domain2_role['id'])

        roles_ref = self.assignment_api.list_grants(

            group_id=group1['id'],

            domain_id=domain1['id'])

        self.assertDictEqual(roles_ref[0], group1_domain1_role)

        roles_ref = self.assignment_api.list_grants(

            group_id=group1['id'],

            domain_id=domain2['id'])

        self.assertDictEqual(roles_ref[0], group1_domain2_role)

        self.assignment_api.delete_grant(group_id=group1['id'],

                                         domain_id=domain2['id'],

                                         role_id=group1_domain2_role['id'])

        roles_ref = self.assignment_api.list_grants(

            group_id=group1['id'],

            domain_id=domain2['id'])

        self.assertEqual(len(roles_ref), 0)

        self.assertRaises(exception.NotFound,

                          self.assignment_api.delete_grant,

                          group_id=group1['id'],

                          domain_id=domain2['id'],

                          role_id=group1_domain2_role['id'])

**** CubicPower OpenStack Study ****

    def test_get_and_remove_role_grant_by_user_and_cross_domain(self):

        user1_domain1_role = {'id': uuid.uuid4().hex,

                              'name': uuid.uuid4().hex}

        self.assignment_api.create_role(user1_domain1_role['id'],

                                        user1_domain1_role)

        user1_domain2_role = {'id': uuid.uuid4().hex,

                              'name': uuid.uuid4().hex}

        self.assignment_api.create_role(user1_domain2_role['id'],

                                        user1_domain2_role)

        domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain1['id'], domain1)

        domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain2['id'], domain2)

        user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                 'domain_id': domain1['id'], 'password': uuid.uuid4().hex,

                 'enabled': True}

        self.identity_api.create_user(user1['id'], user1)

        roles_ref = self.assignment_api.list_grants(

            user_id=user1['id'],

            domain_id=domain1['id'])

        self.assertEqual(len(roles_ref), 0)

        roles_ref = self.assignment_api.list_grants(

            user_id=user1['id'],

            domain_id=domain2['id'])

        self.assertEqual(len(roles_ref), 0)

        self.assignment_api.create_grant(user_id=user1['id'],

                                         domain_id=domain1['id'],

                                         role_id=user1_domain1_role['id'])

        self.assignment_api.create_grant(user_id=user1['id'],

                                         domain_id=domain2['id'],

                                         role_id=user1_domain2_role['id'])

        roles_ref = self.assignment_api.list_grants(

            user_id=user1['id'],

            domain_id=domain1['id'])

        self.assertDictEqual(roles_ref[0], user1_domain1_role)

        roles_ref = self.assignment_api.list_grants(

            user_id=user1['id'],

            domain_id=domain2['id'])

        self.assertDictEqual(roles_ref[0], user1_domain2_role)

        self.assignment_api.delete_grant(user_id=user1['id'],

                                         domain_id=domain2['id'],

                                         role_id=user1_domain2_role['id'])

        roles_ref = self.assignment_api.list_grants(

            user_id=user1['id'],

            domain_id=domain2['id'])

        self.assertEqual(len(roles_ref), 0)

        self.assertRaises(exception.NotFound,

                          self.assignment_api.delete_grant,

                          user_id=user1['id'],

                          domain_id=domain2['id'],

                          role_id=user1_domain2_role['id'])

**** CubicPower OpenStack Study ****

    def test_role_grant_by_group_and_cross_domain_project(self):

        role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_role(role1['id'], role1)

        role2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_role(role2['id'], role2)

        domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain1['id'], domain1)

        domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain2['id'], domain2)

        group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                  'domain_id': domain1['id'], 'enabled': True}

        self.identity_api.create_group(group1['id'], group1)

        project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                    'domain_id': domain2['id']}

        self.assignment_api.create_project(project1['id'], project1)

        roles_ref = self.assignment_api.list_grants(

            group_id=group1['id'],

            project_id=project1['id'])

        self.assertEqual(len(roles_ref), 0)

        self.assignment_api.create_grant(group_id=group1['id'],

                                         project_id=project1['id'],

                                         role_id=role1['id'])

        self.assignment_api.create_grant(group_id=group1['id'],

                                         project_id=project1['id'],

                                         role_id=role2['id'])

        roles_ref = self.assignment_api.list_grants(

            group_id=group1['id'],

            project_id=project1['id'])

        roles_ref_ids = []

        for ref in roles_ref:

            roles_ref_ids.append(ref['id'])

        self.assertIn(role1['id'], roles_ref_ids)

        self.assertIn(role2['id'], roles_ref_ids)

        self.assignment_api.delete_grant(group_id=group1['id'],

                                         project_id=project1['id'],

                                         role_id=role1['id'])

        roles_ref = self.assignment_api.list_grants(

            group_id=group1['id'],

            project_id=project1['id'])

        self.assertEqual(len(roles_ref), 1)

        self.assertDictEqual(roles_ref[0], role2)

**** CubicPower OpenStack Study ****

    def test_role_grant_by_user_and_cross_domain_project(self):

        role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_role(role1['id'], role1)

        role2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_role(role2['id'], role2)

        domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain1['id'], domain1)

        domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain2['id'], domain2)

        user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                 'domain_id': domain1['id'], 'password': uuid.uuid4().hex,

                 'enabled': True}

        self.identity_api.create_user(user1['id'], user1)

        project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                    'domain_id': domain2['id']}

        self.assignment_api.create_project(project1['id'], project1)

        roles_ref = self.assignment_api.list_grants(

            user_id=user1['id'],

            project_id=project1['id'])

        self.assertEqual(len(roles_ref), 0)

        self.assignment_api.create_grant(user_id=user1['id'],

                                         project_id=project1['id'],

                                         role_id=role1['id'])

        self.assignment_api.create_grant(user_id=user1['id'],

                                         project_id=project1['id'],

                                         role_id=role2['id'])

        roles_ref = self.assignment_api.list_grants(

            user_id=user1['id'],

            project_id=project1['id'])

        roles_ref_ids = []

        for ref in roles_ref:

            roles_ref_ids.append(ref['id'])

        self.assertIn(role1['id'], roles_ref_ids)

        self.assertIn(role2['id'], roles_ref_ids)

        self.assignment_api.delete_grant(user_id=user1['id'],

                                         project_id=project1['id'],

                                         role_id=role1['id'])

        roles_ref = self.assignment_api.list_grants(

            user_id=user1['id'],

            project_id=project1['id'])

        self.assertEqual(len(roles_ref), 1)

        self.assertDictEqual(roles_ref[0], role2)

**** CubicPower OpenStack Study ****

    def test_delete_user_grant_no_user(self):

        # Can delete a grant where the user doesn't exist.

        role_id = uuid.uuid4().hex

        role = {'id': role_id, 'name': uuid.uuid4().hex}

        self.assignment_api.create_role(role_id, role)

        user_id = uuid.uuid4().hex

        self.assignment_api.create_grant(role_id, user_id=user_id,

                                         project_id=self.tenant_bar['id'])

        self.assignment_api.delete_grant(role_id, user_id=user_id,

                                         project_id=self.tenant_bar['id'])

**** CubicPower OpenStack Study ****

    def test_delete_group_grant_no_group(self):

        # Can delete a grant where the group doesn't exist.

        role_id = uuid.uuid4().hex

        role = {'id': role_id, 'name': uuid.uuid4().hex}

        self.assignment_api.create_role(role_id, role)

        group_id = uuid.uuid4().hex

        self.assignment_api.create_grant(role_id, group_id=group_id,

                                         project_id=self.tenant_bar['id'])

        self.assignment_api.delete_grant(role_id, group_id=group_id,

                                         project_id=self.tenant_bar['id'])

**** CubicPower OpenStack Study ****

    def test_multi_role_grant_by_user_group_on_project_domain(self):

        role_list = []

        for _ in range(10):

            role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

            self.assignment_api.create_role(role['id'], role)

            role_list.append(role)

        domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain1['id'], domain1)

        user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                 'domain_id': domain1['id'], 'password': uuid.uuid4().hex,

                 'enabled': True}

        self.identity_api.create_user(user1['id'], user1)

        group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                  'domain_id': domain1['id'], 'enabled': True}

        self.identity_api.create_group(group1['id'], group1)

        group2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                  'domain_id': domain1['id'], 'enabled': True}

        self.identity_api.create_group(group2['id'], group2)

        project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                    'domain_id': domain1['id']}

        self.assignment_api.create_project(project1['id'], project1)

        self.identity_api.add_user_to_group(user1['id'],

                                            group1['id'])

        self.identity_api.add_user_to_group(user1['id'],

                                            group2['id'])

        roles_ref = self.assignment_api.list_grants(

            user_id=user1['id'],

            project_id=project1['id'])

        self.assertEqual(len(roles_ref), 0)

        self.assignment_api.create_grant(user_id=user1['id'],

                                         domain_id=domain1['id'],

                                         role_id=role_list[0]['id'])

        self.assignment_api.create_grant(user_id=user1['id'],

                                         domain_id=domain1['id'],

                                         role_id=role_list[1]['id'])

        self.assignment_api.create_grant(group_id=group1['id'],

                                         domain_id=domain1['id'],

                                         role_id=role_list[2]['id'])

        self.assignment_api.create_grant(group_id=group1['id'],

                                         domain_id=domain1['id'],

                                         role_id=role_list[3]['id'])

        self.assignment_api.create_grant(user_id=user1['id'],

                                         project_id=project1['id'],

                                         role_id=role_list[4]['id'])

        self.assignment_api.create_grant(user_id=user1['id'],

                                         project_id=project1['id'],

                                         role_id=role_list[5]['id'])

        self.assignment_api.create_grant(group_id=group1['id'],

                                         project_id=project1['id'],

                                         role_id=role_list[6]['id'])

        self.assignment_api.create_grant(group_id=group1['id'],

                                         project_id=project1['id'],

                                         role_id=role_list[7]['id'])

        roles_ref = self.assignment_api.list_grants(user_id=user1['id'],

                                                    domain_id=domain1['id'])

        self.assertEqual(len(roles_ref), 2)

        self.assertIn(role_list[0], roles_ref)

        self.assertIn(role_list[1], roles_ref)

        roles_ref = self.assignment_api.list_grants(group_id=group1['id'],

                                                    domain_id=domain1['id'])

        self.assertEqual(len(roles_ref), 2)

        self.assertIn(role_list[2], roles_ref)

        self.assertIn(role_list[3], roles_ref)

        roles_ref = self.assignment_api.list_grants(user_id=user1['id'],

                                                    project_id=project1['id'])

        self.assertEqual(len(roles_ref), 2)

        self.assertIn(role_list[4], roles_ref)

        self.assertIn(role_list[5], roles_ref)

        roles_ref = self.assignment_api.list_grants(group_id=group1['id'],

                                                    project_id=project1['id'])

        self.assertEqual(len(roles_ref), 2)

        self.assertIn(role_list[6], roles_ref)

        self.assertIn(role_list[7], roles_ref)

        # Now test the alternate way of getting back lists of grants,

        # where user and group roles are combined.  These should match

        # the above results.

        combined_list = self.assignment_api.get_roles_for_user_and_project(

            user1['id'], project1['id'])

        self.assertEqual(len(combined_list), 4)

        self.assertIn(role_list[4]['id'], combined_list)

        self.assertIn(role_list[5]['id'], combined_list)

        self.assertIn(role_list[6]['id'], combined_list)

        self.assertIn(role_list[7]['id'], combined_list)

        combined_role_list = self.assignment_api.get_roles_for_user_and_domain(

            user1['id'], domain1['id'])

        self.assertEqual(len(combined_role_list), 4)

        self.assertIn(role_list[0]['id'], combined_role_list)

        self.assertIn(role_list[1]['id'], combined_role_list)

        self.assertIn(role_list[2]['id'], combined_role_list)

        self.assertIn(role_list[3]['id'], combined_role_list)

**** CubicPower OpenStack Study ****

    def test_multi_group_grants_on_project_domain(self):

        """Test multiple group roles for user on project and domain.

        Test Plan:

        - Create 6 roles

        - Create a domain, with a project, user and two groups

        - Make the user a member of both groups

        - Check no roles yet exit

        - Assign a role to each user and both groups on both the

          project and domain

        - Get a list of effective roles for the user on both the

          project and domain, checking we get back the correct three

          roles

        """

        role_list = []

        for _ in range(6):

            role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

            self.assignment_api.create_role(role['id'], role)

            role_list.append(role)

        domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain1['id'], domain1)

        user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                 'domain_id': domain1['id'], 'password': uuid.uuid4().hex,

                 'enabled': True}

        self.identity_api.create_user(user1['id'], user1)

        group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                  'domain_id': domain1['id'], 'enabled': True}

        self.identity_api.create_group(group1['id'], group1)

        group2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                  'domain_id': domain1['id'], 'enabled': True}

        self.identity_api.create_group(group2['id'], group2)

        project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                    'domain_id': domain1['id']}

        self.assignment_api.create_project(project1['id'], project1)

        self.identity_api.add_user_to_group(user1['id'],

                                            group1['id'])

        self.identity_api.add_user_to_group(user1['id'],

                                            group2['id'])

        roles_ref = self.assignment_api.list_grants(

            user_id=user1['id'],

            project_id=project1['id'])

        self.assertEqual(len(roles_ref), 0)

        self.assignment_api.create_grant(user_id=user1['id'],

                                         domain_id=domain1['id'],

                                         role_id=role_list[0]['id'])

        self.assignment_api.create_grant(group_id=group1['id'],

                                         domain_id=domain1['id'],

                                         role_id=role_list[1]['id'])

        self.assignment_api.create_grant(group_id=group2['id'],

                                         domain_id=domain1['id'],

                                         role_id=role_list[2]['id'])

        self.assignment_api.create_grant(user_id=user1['id'],

                                         project_id=project1['id'],

                                         role_id=role_list[3]['id'])

        self.assignment_api.create_grant(group_id=group1['id'],

                                         project_id=project1['id'],

                                         role_id=role_list[4]['id'])

        self.assignment_api.create_grant(group_id=group2['id'],

                                         project_id=project1['id'],

                                         role_id=role_list[5]['id'])

        # Read by the roles, ensuring we get the correct 3 roles for

        # both project and domain

        combined_list = self.assignment_api.get_roles_for_user_and_project(

            user1['id'], project1['id'])

        self.assertEqual(len(combined_list), 3)

        self.assertIn(role_list[3]['id'], combined_list)

        self.assertIn(role_list[4]['id'], combined_list)

        self.assertIn(role_list[5]['id'], combined_list)

        combined_role_list = self.assignment_api.get_roles_for_user_and_domain(

            user1['id'], domain1['id'])

        self.assertEqual(len(combined_role_list), 3)

        self.assertIn(role_list[0]['id'], combined_role_list)

        self.assertIn(role_list[1]['id'], combined_role_list)

        self.assertIn(role_list[2]['id'], combined_role_list)

**** CubicPower OpenStack Study ****

    def test_delete_role_with_user_and_group_grants(self):

        role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_role(role1['id'], role1)

        domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain1['id'], domain1)

        project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                    'domain_id': domain1['id']}

        self.assignment_api.create_project(project1['id'], project1)

        user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                 'domain_id': domain1['id'], 'password': uuid.uuid4().hex,

                 'enabled': True}

        self.identity_api.create_user(user1['id'], user1)

        group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                  'domain_id': domain1['id'], 'enabled': True}

        self.identity_api.create_group(group1['id'], group1)

        self.assignment_api.create_grant(user_id=user1['id'],

                                         project_id=project1['id'],

                                         role_id=role1['id'])

        self.assignment_api.create_grant(user_id=user1['id'],

                                         domain_id=domain1['id'],

                                         role_id=role1['id'])

        self.assignment_api.create_grant(group_id=group1['id'],

                                         project_id=project1['id'],

                                         role_id=role1['id'])

        self.assignment_api.create_grant(group_id=group1['id'],

                                         domain_id=domain1['id'],

                                         role_id=role1['id'])

        roles_ref = self.assignment_api.list_grants(

            user_id=user1['id'],

            project_id=project1['id'])

        self.assertEqual(len(roles_ref), 1)

        roles_ref = self.assignment_api.list_grants(

            group_id=group1['id'],

            project_id=project1['id'])

        self.assertEqual(len(roles_ref), 1)

        roles_ref = self.assignment_api.list_grants(

            user_id=user1['id'],

            domain_id=domain1['id'])

        self.assertEqual(len(roles_ref), 1)

        roles_ref = self.assignment_api.list_grants(

            group_id=group1['id'],

            domain_id=domain1['id'])

        self.assertEqual(len(roles_ref), 1)

        self.assignment_api.delete_role(role1['id'])

        roles_ref = self.assignment_api.list_grants(

            user_id=user1['id'],

            project_id=project1['id'])

        self.assertEqual(len(roles_ref), 0)

        roles_ref = self.assignment_api.list_grants(

            group_id=group1['id'],

            project_id=project1['id'])

        self.assertEqual(len(roles_ref), 0)

        roles_ref = self.assignment_api.list_grants(

            user_id=user1['id'],

            domain_id=domain1['id'])

        self.assertEqual(len(roles_ref), 0)

        roles_ref = self.assignment_api.list_grants(

            group_id=group1['id'],

            domain_id=domain1['id'])

        self.assertEqual(len(roles_ref), 0)

**** CubicPower OpenStack Study ****

    def test_delete_user_with_group_project_domain_links(self):

        role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_role(role1['id'], role1)

        domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain1['id'], domain1)

        project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                    'domain_id': domain1['id']}

        self.assignment_api.create_project(project1['id'], project1)

        user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                 'domain_id': domain1['id'], 'password': uuid.uuid4().hex,

                 'enabled': True}

        self.identity_api.create_user(user1['id'], user1)

        group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                  'domain_id': domain1['id'], 'enabled': True}

        self.identity_api.create_group(group1['id'], group1)

        self.assignment_api.create_grant(user_id=user1['id'],

                                         project_id=project1['id'],

                                         role_id=role1['id'])

        self.assignment_api.create_grant(user_id=user1['id'],

                                         domain_id=domain1['id'],

                                         role_id=role1['id'])

        self.identity_api.add_user_to_group(user_id=user1['id'],

                                            group_id=group1['id'])

        roles_ref = self.assignment_api.list_grants(

            user_id=user1['id'],

            project_id=project1['id'])

        self.assertEqual(len(roles_ref), 1)

        roles_ref = self.assignment_api.list_grants(

            user_id=user1['id'],

            domain_id=domain1['id'])

        self.assertEqual(len(roles_ref), 1)

        self.identity_api.check_user_in_group(

            user_id=user1['id'],

            group_id=group1['id'])

        self.identity_api.delete_user(user1['id'])

        self.assertRaises(exception.NotFound,

                          self.identity_api.check_user_in_group,

                          user1['id'],

                          group1['id'])

**** CubicPower OpenStack Study ****

    def test_delete_group_with_user_project_domain_links(self):

        role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_role(role1['id'], role1)

        domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain1['id'], domain1)

        project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                    'domain_id': domain1['id']}

        self.assignment_api.create_project(project1['id'], project1)

        user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                 'domain_id': domain1['id'], 'password': uuid.uuid4().hex,

                 'enabled': True}

        self.identity_api.create_user(user1['id'], user1)

        group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                  'domain_id': domain1['id'], 'enabled': True}

        self.identity_api.create_group(group1['id'], group1)

        self.assignment_api.create_grant(group_id=group1['id'],

                                         project_id=project1['id'],

                                         role_id=role1['id'])

        self.assignment_api.create_grant(group_id=group1['id'],

                                         domain_id=domain1['id'],

                                         role_id=role1['id'])

        self.identity_api.add_user_to_group(user_id=user1['id'],

                                            group_id=group1['id'])

        roles_ref = self.assignment_api.list_grants(

            group_id=group1['id'],

            project_id=project1['id'])

        self.assertEqual(len(roles_ref), 1)

        roles_ref = self.assignment_api.list_grants(

            group_id=group1['id'],

            domain_id=domain1['id'])

        self.assertEqual(len(roles_ref), 1)

        self.identity_api.check_user_in_group(

            user_id=user1['id'],

            group_id=group1['id'])

        self.identity_api.delete_group(group1['id'])

        self.identity_api.get_user(user1['id'])

**** CubicPower OpenStack Study ****

    def test_delete_domain_with_user_group_project_links(self):

        #TODO(chungg):add test case once expected behaviour defined

        pass

**** CubicPower OpenStack Study ****

    def test_role_crud(self):

        role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_role(role['id'], role)

        role_ref = self.assignment_api.get_role(role['id'])

        role_ref_dict = dict((x, role_ref[x]) for x in role_ref)

        self.assertDictEqual(role_ref_dict, role)

        role['name'] = uuid.uuid4().hex

        updated_role_ref = self.assignment_api.update_role(role['id'], role)

        role_ref = self.assignment_api.get_role(role['id'])

        role_ref_dict = dict((x, role_ref[x]) for x in role_ref)

        self.assertDictEqual(role_ref_dict, role)

        self.assertDictEqual(role_ref_dict, updated_role_ref)

        self.assignment_api.delete_role(role['id'])

        self.assertRaises(exception.RoleNotFound,

                          self.assignment_api.get_role,

                          role['id'])

**** CubicPower OpenStack Study ****

    def test_update_role_404(self):

        role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assertRaises(exception.RoleNotFound,

                          self.assignment_api.update_role,

                          role['id'],

                          role)

**** CubicPower OpenStack Study ****

    def test_add_user_to_project(self):

        self.assignment_api.add_user_to_project(self.tenant_baz['id'],

                                                self.user_foo['id'])

        tenants = self.assignment_api.list_projects_for_user(

            self.user_foo['id'])

        self.assertIn(self.tenant_baz, tenants)

**** CubicPower OpenStack Study ****

    def test_add_user_to_project_missing_default_role(self):

        self.assignment_api.delete_role(CONF.member_role_id)

        self.assertRaises(exception.RoleNotFound,

                          self.assignment_api.get_role,

                          CONF.member_role_id)

        self.assignment_api.add_user_to_project(self.tenant_baz['id'],

                                                self.user_foo['id'])

        tenants = (

            self.assignment_api.list_projects_for_user(self.user_foo['id']))

        self.assertIn(self.tenant_baz, tenants)

        default_role = self.assignment_api.get_role(CONF.member_role_id)

        self.assertIsNotNone(default_role)

**** CubicPower OpenStack Study ****

    def test_add_user_to_project_404(self):

        self.assertRaises(exception.ProjectNotFound,

                          self.assignment_api.add_user_to_project,

                          uuid.uuid4().hex,

                          self.user_foo['id'])

**** CubicPower OpenStack Study ****

    def test_add_user_to_project_no_user(self):

        # If add_user_to_project and the user doesn't exist, then

        # no error.

        user_id_not_exist = uuid.uuid4().hex

        self.assignment_api.add_user_to_project(self.tenant_bar['id'],

                                                user_id_not_exist)

**** CubicPower OpenStack Study ****

    def test_remove_user_from_project(self):

        self.assignment_api.add_user_to_project(self.tenant_baz['id'],

                                                self.user_foo['id'])

        self.assignment_api.remove_user_from_project(self.tenant_baz['id'],

                                                     self.user_foo['id'])

        tenants = self.assignment_api.list_projects_for_user(

            self.user_foo['id'])

        self.assertNotIn(self.tenant_baz, tenants)

**** CubicPower OpenStack Study ****

    def test_remove_user_from_project_race_delete_role(self):

        self.assignment_api.add_user_to_project(self.tenant_baz['id'],

                                                self.user_foo['id'])

        self.assignment_api.add_role_to_user_and_project(

            tenant_id=self.tenant_baz['id'],

            user_id=self.user_foo['id'],

            role_id=self.role_other['id'])

        # Mock a race condition, delete a role after

        # get_roles_for_user_and_project() is called in

        # remove_user_from_project().

        roles = self.assignment_api.get_roles_for_user_and_project(

            self.user_foo['id'], self.tenant_baz['id'])

        self.assignment_api.delete_role(self.role_other['id'])

        self.assignment_api.get_roles_for_user_and_project = mock.Mock(

            return_value=roles)

        self.assignment_api.remove_user_from_project(self.tenant_baz['id'],

                                                     self.user_foo['id'])

        tenants = self.assignment_api.list_projects_for_user(

            self.user_foo['id'])

        self.assertNotIn(self.tenant_baz, tenants)

**** CubicPower OpenStack Study ****

    def test_remove_user_from_project_404(self):

        self.assertRaises(exception.ProjectNotFound,

                          self.assignment_api.remove_user_from_project,

                          uuid.uuid4().hex,

                          self.user_foo['id'])

        self.assertRaises(exception.UserNotFound,

                          self.assignment_api.remove_user_from_project,

                          self.tenant_bar['id'],

                          uuid.uuid4().hex)

        self.assertRaises(exception.NotFound,

                          self.assignment_api.remove_user_from_project,

                          self.tenant_baz['id'],

                          self.user_foo['id'])

**** CubicPower OpenStack Study ****

    def test_list_user_project_ids_404(self):

        self.assertRaises(exception.UserNotFound,

                          self.assignment_api.list_projects_for_user,

                          uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_update_project_404(self):

        self.assertRaises(exception.ProjectNotFound,

                          self.assignment_api.update_project,

                          uuid.uuid4().hex,

                          dict())

**** CubicPower OpenStack Study ****

    def test_delete_project_404(self):

        self.assertRaises(exception.ProjectNotFound,

                          self.assignment_api.delete_project,

                          uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_update_user_404(self):

        user_id = uuid.uuid4().hex

        self.assertRaises(exception.UserNotFound,

                          self.identity_api.update_user,

                          user_id,

                          {'id': user_id,

                           'domain_id': DEFAULT_DOMAIN_ID})

**** CubicPower OpenStack Study ****

    def test_delete_user_with_project_association(self):

        user = {'id': uuid.uuid4().hex,

                'name': uuid.uuid4().hex,

                'domain_id': DEFAULT_DOMAIN_ID,

                'password': uuid.uuid4().hex}

        self.identity_api.create_user(user['id'], user)

        self.assignment_api.add_user_to_project(self.tenant_bar['id'],

                                                user['id'])

        self.identity_api.delete_user(user['id'])

        self.assertRaises(exception.UserNotFound,

                          self.assignment_api.list_projects_for_user,

                          user['id'])

**** CubicPower OpenStack Study ****

    def test_delete_user_with_project_roles(self):

        user = {'id': uuid.uuid4().hex,

                'name': uuid.uuid4().hex,

                'domain_id': DEFAULT_DOMAIN_ID,

                'password': uuid.uuid4().hex}

        self.identity_api.create_user(user['id'], user)

        self.assignment_api.add_role_to_user_and_project(

            user['id'],

            self.tenant_bar['id'],

            self.role_member['id'])

        self.identity_api.delete_user(user['id'])

        self.assertRaises(exception.UserNotFound,

                          self.assignment_api.list_projects_for_user,

                          user['id'])

**** CubicPower OpenStack Study ****

    def test_delete_user_404(self):

        self.assertRaises(exception.UserNotFound,

                          self.identity_api.delete_user,

                          uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_delete_role_404(self):

        self.assertRaises(exception.RoleNotFound,

                          self.assignment_api.delete_role,

                          uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_create_update_delete_unicode_project(self):

        unicode_project_name = u'name \u540d\u5b57'

        project = {'id': uuid.uuid4().hex,

                   'name': unicode_project_name,

                   'description': uuid.uuid4().hex,

                   'domain_id': CONF.identity.default_domain_id}

        self.assignment_api.create_project(project['id'], project)

        self.assignment_api.update_project(project['id'], project)

        self.assignment_api.delete_project(project['id'])

**** CubicPower OpenStack Study ****

    def test_create_project_case_sensitivity(self):

        # create a ref with a lowercase name

        ref = {

            'id': uuid.uuid4().hex,

            'name': uuid.uuid4().hex.lower(),

            'domain_id': DEFAULT_DOMAIN_ID}

        self.assignment_api.create_project(ref['id'], ref)

        # assign a new ID with the same name, but this time in uppercase

        ref['id'] = uuid.uuid4().hex

        ref['name'] = ref['name'].upper()

        self.assignment_api.create_project(ref['id'], ref)

**** CubicPower OpenStack Study ****

    def test_create_project_with_no_enabled_field(self):

        ref = {

            'id': uuid.uuid4().hex,

            'name': uuid.uuid4().hex.lower(),

            'domain_id': DEFAULT_DOMAIN_ID}

        self.assignment_api.create_project(ref['id'], ref)

        project = self.assignment_api.get_project(ref['id'])

        self.assertIs(project['enabled'], True)

**** CubicPower OpenStack Study ****

    def test_create_project_long_name_fails(self):

        tenant = {'id': 'fake1', 'name': 'a' * 65,

                  'domain_id': DEFAULT_DOMAIN_ID}

        self.assertRaises(exception.ValidationError,

                          self.assignment_api.create_project,

                          tenant['id'],

                          tenant)

**** CubicPower OpenStack Study ****

    def test_create_project_blank_name_fails(self):

        tenant = {'id': 'fake1', 'name': '',

                  'domain_id': DEFAULT_DOMAIN_ID}

        self.assertRaises(exception.ValidationError,

                          self.assignment_api.create_project,

                          tenant['id'],

                          tenant)

**** CubicPower OpenStack Study ****

    def test_create_project_invalid_name_fails(self):

        tenant = {'id': 'fake1', 'name': None,

                  'domain_id': DEFAULT_DOMAIN_ID}

        self.assertRaises(exception.ValidationError,

                          self.assignment_api.create_project,

                          tenant['id'],

                          tenant)

        tenant = {'id': 'fake1', 'name': 123,

                  'domain_id': DEFAULT_DOMAIN_ID}

        self.assertRaises(exception.ValidationError,

                          self.assignment_api.create_project,

                          tenant['id'],

                          tenant)

**** CubicPower OpenStack Study ****

    def test_update_project_blank_name_fails(self):

        tenant = {'id': 'fake1', 'name': 'fake1',

                  'domain_id': DEFAULT_DOMAIN_ID}

        self.assignment_api.create_project('fake1', tenant)

        tenant['name'] = ''

        self.assertRaises(exception.ValidationError,

                          self.assignment_api.update_project,

                          tenant['id'],

                          tenant)

**** CubicPower OpenStack Study ****

    def test_update_project_long_name_fails(self):

        tenant = {'id': 'fake1', 'name': 'fake1',

                  'domain_id': DEFAULT_DOMAIN_ID}

        self.assignment_api.create_project('fake1', tenant)

        tenant['name'] = 'a' * 65

        self.assertRaises(exception.ValidationError,

                          self.assignment_api.update_project,

                          tenant['id'],

                          tenant)

**** CubicPower OpenStack Study ****

    def test_update_project_invalid_name_fails(self):

        tenant = {'id': 'fake1', 'name': 'fake1',

                  'domain_id': DEFAULT_DOMAIN_ID}

        self.assignment_api.create_project('fake1', tenant)

        tenant['name'] = None

        self.assertRaises(exception.ValidationError,

                          self.assignment_api.update_project,

                          tenant['id'],

                          tenant)

        tenant['name'] = 123

        self.assertRaises(exception.ValidationError,

                          self.assignment_api.update_project,

                          tenant['id'],

                          tenant)

**** CubicPower OpenStack Study ****

    def test_create_user_case_sensitivity(self):

        # create a ref with a lowercase name

        ref = {

            'id': uuid.uuid4().hex,

            'name': uuid.uuid4().hex.lower(),

            'domain_id': DEFAULT_DOMAIN_ID}

        self.identity_api.create_user(ref['id'], ref)

        # assign a new ID with the same name, but this time in uppercase

        ref['id'] = uuid.uuid4().hex

        ref['name'] = ref['name'].upper()

        self.identity_api.create_user(ref['id'], ref)

**** CubicPower OpenStack Study ****

    def test_create_user_long_name_fails(self):

        user = {'id': 'fake1', 'name': 'a' * 256,

                'domain_id': DEFAULT_DOMAIN_ID}

        self.assertRaises(exception.ValidationError,

                          self.identity_api.create_user,

                          'fake1',

                          user)

**** CubicPower OpenStack Study ****

    def test_create_user_blank_name_fails(self):

        user = {'id': 'fake1', 'name': '',

                'domain_id': DEFAULT_DOMAIN_ID}

        self.assertRaises(exception.ValidationError,

                          self.identity_api.create_user,

                          'fake1',

                          user)

**** CubicPower OpenStack Study ****

    def test_create_user_missed_password(self):

        user = {'id': 'fake1', 'name': 'fake1',

                'domain_id': DEFAULT_DOMAIN_ID}

        self.identity_api.create_user('fake1', user)

        self.identity_api.get_user('fake1')

        # Make sure  the user is not allowed to login

        # with a password that  is empty string or None

        self.assertRaises(AssertionError,

                          self.identity_api.authenticate,

                          context={},

                          user_id='fake1',

                          password='')

        self.assertRaises(AssertionError,

                          self.identity_api.authenticate,

                          context={},

                          user_id='fake1',

                          password=None)

**** CubicPower OpenStack Study ****

    def test_create_user_none_password(self):

        user = {'id': 'fake1', 'name': 'fake1', 'password': None,

                'domain_id': DEFAULT_DOMAIN_ID}

        self.identity_api.create_user('fake1', user)

        self.identity_api.get_user('fake1')

        # Make sure  the user is not allowed to login

        # with a password that  is empty string or None

        self.assertRaises(AssertionError,

                          self.identity_api.authenticate,

                          context={},

                          user_id='fake1',

                          password='')

        self.assertRaises(AssertionError,

                          self.identity_api.authenticate,

                          context={},

                          user_id='fake1',

                          password=None)

**** CubicPower OpenStack Study ****

    def test_create_user_invalid_name_fails(self):

        user = {'id': 'fake1', 'name': None,

                'domain_id': DEFAULT_DOMAIN_ID}

        self.assertRaises(exception.ValidationError,

                          self.identity_api.create_user,

                          'fake1',

                          user)

        user = {'id': 'fake1', 'name': 123,

                'domain_id': DEFAULT_DOMAIN_ID}

        self.assertRaises(exception.ValidationError,

                          self.identity_api.create_user,

                          'fake1',

                          user)

**** CubicPower OpenStack Study ****

    def test_update_project_invalid_enabled_type_string(self):

            project = {'id': uuid.uuid4().hex,

                       'name': uuid.uuid4().hex,

                       'enabled': True,

                       'domain_id': DEFAULT_DOMAIN_ID}

            self.assignment_api.create_project(project['id'], project)

            project_ref = self.assignment_api.get_project(project['id'])

            self.assertEqual(project_ref['enabled'], True)

            # Strings are not valid boolean values

            project['enabled'] = "false"

            self.assertRaises(exception.ValidationError,

                              self.assignment_api.update_project,

                              project['id'],

                              project)

**** CubicPower OpenStack Study ****

    def test_create_project_invalid_enabled_type_string(self):

        project = {'id': uuid.uuid4().hex,

                   'name': uuid.uuid4().hex,

                   'domain_id': DEFAULT_DOMAIN_ID,

                   # invalid string value

                   'enabled': "true"}

        self.assertRaises(exception.ValidationError,

                          self.assignment_api.create_project,

                          project['id'],

                          project)

**** CubicPower OpenStack Study ****

    def test_create_user_invalid_enabled_type_string(self):

        user = {'id': uuid.uuid4().hex,

                'name': uuid.uuid4().hex,

                'domain_id': DEFAULT_DOMAIN_ID,

                'password': uuid.uuid4().hex,

                # invalid string value

                'enabled': "true"}

        self.assertRaises(exception.ValidationError,

                          self.identity_api.create_user,

                          user['id'],

                          user)

**** CubicPower OpenStack Study ****

    def test_update_user_long_name_fails(self):

        user = {'id': 'fake1', 'name': 'fake1',

                'domain_id': DEFAULT_DOMAIN_ID}

        self.identity_api.create_user('fake1', user)

        user['name'] = 'a' * 256

        self.assertRaises(exception.ValidationError,

                          self.identity_api.update_user,

                          'fake1',

                          user)

**** CubicPower OpenStack Study ****

    def test_update_user_blank_name_fails(self):

        user = {'id': 'fake1', 'name': 'fake1',

                'domain_id': DEFAULT_DOMAIN_ID}

        self.identity_api.create_user('fake1', user)

        user['name'] = ''

        self.assertRaises(exception.ValidationError,

                          self.identity_api.update_user,

                          'fake1',

                          user)

**** CubicPower OpenStack Study ****

    def test_update_user_invalid_name_fails(self):

        user = {'id': 'fake1', 'name': 'fake1',

                'domain_id': DEFAULT_DOMAIN_ID}

        self.identity_api.create_user('fake1', user)

        user['name'] = None

        self.assertRaises(exception.ValidationError,

                          self.identity_api.update_user,

                          'fake1',

                          user)

        user['name'] = 123

        self.assertRaises(exception.ValidationError,

                          self.identity_api.update_user,

                          'fake1',

                          user)

**** CubicPower OpenStack Study ****

    def test_list_users(self):

        users = self.identity_api.list_users()

        self.assertEqual(len(default_fixtures.USERS), len(users))

        user_ids = set(user['id'] for user in users)

        expected_user_ids = set(user['id'] for user in default_fixtures.USERS)

        for user_ref in users:

            self.assertNotIn('password', user_ref)

        self.assertEqual(expected_user_ids, user_ids)

**** CubicPower OpenStack Study ****

    def test_list_groups(self):

        group1 = {

            'id': uuid.uuid4().hex,

            'domain_id': DEFAULT_DOMAIN_ID,

            'name': uuid.uuid4().hex}

        group2 = {

            'id': uuid.uuid4().hex,

            'domain_id': DEFAULT_DOMAIN_ID,

            'name': uuid.uuid4().hex}

        self.identity_api.create_group(group1['id'], group1)

        self.identity_api.create_group(group2['id'], group2)

        groups = self.identity_api.list_groups()

        self.assertEqual(len(groups), 2)

        group_ids = []

        for group in groups:

            group_ids.append(group.get('id'))

        self.assertIn(group1['id'], group_ids)

        self.assertIn(group2['id'], group_ids)

**** CubicPower OpenStack Study ****

    def test_list_domains(self):

        domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain1['id'], domain1)

        self.assignment_api.create_domain(domain2['id'], domain2)

        domains = self.assignment_api.list_domains()

        self.assertEqual(len(domains), 3)

        domain_ids = []

        for domain in domains:

            domain_ids.append(domain.get('id'))

        self.assertIn(DEFAULT_DOMAIN_ID, domain_ids)

        self.assertIn(domain1['id'], domain_ids)

        self.assertIn(domain2['id'], domain_ids)

**** CubicPower OpenStack Study ****

    def test_list_projects(self):

        projects = self.assignment_api.list_projects()

        self.assertEqual(len(projects), 4)

        project_ids = []

        for project in projects:

            project_ids.append(project.get('id'))

        self.assertIn(self.tenant_bar['id'], project_ids)

        self.assertIn(self.tenant_baz['id'], project_ids)

**** CubicPower OpenStack Study ****

    def test_list_projects_for_domain(self):

        project_ids = ([x['id'] for x in

                       self.assignment_api.list_projects_in_domain(

                           DEFAULT_DOMAIN_ID)])

        self.assertEqual(len(project_ids), 4)

        self.assertIn(self.tenant_bar['id'], project_ids)

        self.assertIn(self.tenant_baz['id'], project_ids)

        self.assertIn(self.tenant_mtu['id'], project_ids)

        self.assertIn(self.tenant_service['id'], project_ids)

**** CubicPower OpenStack Study ****

    def test_list_projects_for_alternate_domain(self):

        domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain1['id'], domain1)

        project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                    'domain_id': domain1['id']}

        self.assignment_api.create_project(project1['id'], project1)

        project2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                    'domain_id': domain1['id']}

        self.assignment_api.create_project(project2['id'], project2)

        project_ids = ([x['id'] for x in

                       self.assignment_api.list_projects_in_domain(

                           domain1['id'])])

        self.assertEqual(len(project_ids), 2)

        self.assertIn(project1['id'], project_ids)

        self.assertIn(project2['id'], project_ids)

**** CubicPower OpenStack Study ****

    def test_list_roles(self):

        roles = self.assignment_api.list_roles()

        self.assertEqual(len(default_fixtures.ROLES), len(roles))

        role_ids = set(role['id'] for role in roles)

        expected_role_ids = set(role['id'] for role in default_fixtures.ROLES)

        self.assertEqual(expected_role_ids, role_ids)

**** CubicPower OpenStack Study ****

    def test_delete_project_with_role_assignments(self):

        tenant = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                  'domain_id': DEFAULT_DOMAIN_ID}

        self.assignment_api.create_project(tenant['id'], tenant)

        self.assignment_api.add_role_to_user_and_project(

            self.user_foo['id'], tenant['id'], 'member')

        self.assignment_api.delete_project(tenant['id'])

        self.assertRaises(exception.NotFound,

                          self.assignment_api.get_project,

                          tenant['id'])

**** CubicPower OpenStack Study ****

    def test_delete_role_check_role_grant(self):

        role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        alt_role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_role(role['id'], role)

        self.assignment_api.create_role(alt_role['id'], alt_role)

        self.assignment_api.add_role_to_user_and_project(

            self.user_foo['id'], self.tenant_bar['id'], role['id'])

        self.assignment_api.add_role_to_user_and_project(

            self.user_foo['id'], self.tenant_bar['id'], alt_role['id'])

        self.assignment_api.delete_role(role['id'])

        roles_ref = self.assignment_api.get_roles_for_user_and_project(

            self.user_foo['id'], self.tenant_bar['id'])

        self.assertNotIn(role['id'], roles_ref)

        self.assertIn(alt_role['id'], roles_ref)

**** CubicPower OpenStack Study ****

    def test_create_project_doesnt_modify_passed_in_dict(self):

        new_project = {'id': 'tenant_id', 'name': uuid.uuid4().hex,

                       'domain_id': DEFAULT_DOMAIN_ID}

        original_project = new_project.copy()

        self.assignment_api.create_project('tenant_id', new_project)

        self.assertDictEqual(original_project, new_project)

**** CubicPower OpenStack Study ****

    def test_create_user_doesnt_modify_passed_in_dict(self):

        new_user = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                    'password': uuid.uuid4().hex,

                    'domain_id': DEFAULT_DOMAIN_ID}

        original_user = new_user.copy()

        self.identity_api.create_user('user_id', new_user)

        self.assertDictEqual(original_user, new_user)

**** CubicPower OpenStack Study ****

    def test_update_user_enable(self):

        user = {'id': 'fake1', 'name': 'fake1', 'enabled': True,

                'domain_id': DEFAULT_DOMAIN_ID}

        self.identity_api.create_user('fake1', user)

        user_ref = self.identity_api.get_user('fake1')

        self.assertEqual(user_ref['enabled'], True)

        user['enabled'] = False

        self.identity_api.update_user('fake1', user)

        user_ref = self.identity_api.get_user('fake1')

        self.assertEqual(user_ref['enabled'], user['enabled'])

        # If not present, enabled field should not be updated

        del user['enabled']

        self.identity_api.update_user('fake1', user)

        user_ref = self.identity_api.get_user('fake1')

        self.assertEqual(user_ref['enabled'], False)

        user['enabled'] = True

        self.identity_api.update_user('fake1', user)

        user_ref = self.identity_api.get_user('fake1')

        self.assertEqual(user_ref['enabled'], user['enabled'])

        del user['enabled']

        self.identity_api.update_user('fake1', user)

        user_ref = self.identity_api.get_user('fake1')

        self.assertEqual(user_ref['enabled'], True)

        # Integers are valid Python's booleans. Explicitly test it.

        user['enabled'] = 0

        self.identity_api.update_user('fake1', user)

        user_ref = self.identity_api.get_user('fake1')

        self.assertEqual(user_ref['enabled'], False)

        # Any integers other than 0 are interpreted as True

        user['enabled'] = -42

        self.identity_api.update_user('fake1', user)

        user_ref = self.identity_api.get_user('fake1')

        self.assertEqual(user_ref['enabled'], True)

**** CubicPower OpenStack Study ****

    def test_update_user_name(self):

        user = {'id': uuid.uuid4().hex,

                'name': uuid.uuid4().hex,

                'enabled': True,

                'domain_id': DEFAULT_DOMAIN_ID}

        self.identity_api.create_user(user['id'], user)

        user_ref = self.identity_api.get_user(user['id'])

        self.assertEqual(user['name'], user_ref['name'])

        changed_name = user_ref['name'] + '_changed'

        user_ref['name'] = changed_name

        updated_user = self.identity_api.update_user(user_ref['id'], user_ref)

        # NOTE(dstanek): the SQL backend adds an 'extra' field containing a

        #                dictionary of the extra fields in addition to the

        #                fields in the object. For the details see:

        #                SqlIdentity.test_update_project_returns_extra

        updated_user.pop('extra', None)

        self.assertDictEqual(user_ref, updated_user)

        user_ref = self.identity_api.get_user(user_ref['id'])

        self.assertEqual(user_ref['name'], changed_name)

**** CubicPower OpenStack Study ****

    def test_update_user_enable_fails(self):

        user = {'id': 'fake1', 'name': 'fake1', 'enabled': True,

                'domain_id': DEFAULT_DOMAIN_ID}

        self.identity_api.create_user('fake1', user)

        user_ref = self.identity_api.get_user('fake1')

        self.assertEqual(user_ref['enabled'], True)

        # Strings are not valid boolean values

        user['enabled'] = "false"

        self.assertRaises(exception.ValidationError,

                          self.identity_api.update_user,

                          'fake1',

                          user)

**** CubicPower OpenStack Study ****

    def test_update_project_enable(self):

        tenant = {'id': 'fake1', 'name': 'fake1', 'enabled': True,

                  'domain_id': DEFAULT_DOMAIN_ID}

        self.assignment_api.create_project('fake1', tenant)

        tenant_ref = self.assignment_api.get_project('fake1')

        self.assertEqual(tenant_ref['enabled'], True)

        tenant['enabled'] = False

        self.assignment_api.update_project('fake1', tenant)

        tenant_ref = self.assignment_api.get_project('fake1')

        self.assertEqual(tenant_ref['enabled'], tenant['enabled'])

        # If not present, enabled field should not be updated

        del tenant['enabled']

        self.assignment_api.update_project('fake1', tenant)

        tenant_ref = self.assignment_api.get_project('fake1')

        self.assertEqual(tenant_ref['enabled'], False)

        tenant['enabled'] = True

        self.assignment_api.update_project('fake1', tenant)

        tenant_ref = self.assignment_api.get_project('fake1')

        self.assertEqual(tenant_ref['enabled'], tenant['enabled'])

        del tenant['enabled']

        self.assignment_api.update_project('fake1', tenant)

        tenant_ref = self.assignment_api.get_project('fake1')

        self.assertEqual(tenant_ref['enabled'], True)

**** CubicPower OpenStack Study ****

    def test_add_user_to_group(self):

        domain = self._get_domain_fixture()

        new_group = {'id': uuid.uuid4().hex, 'domain_id': domain['id'],

                     'name': uuid.uuid4().hex}

        self.identity_api.create_group(new_group['id'], new_group)

        new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',

                    'password': uuid.uuid4().hex, 'enabled': True,

                    'domain_id': domain['id']}

        self.identity_api.create_user(new_user['id'], new_user)

        self.identity_api.add_user_to_group(new_user['id'],

                                            new_group['id'])

        groups = self.identity_api.list_groups_for_user(new_user['id'])

        found = False

        for x in groups:

            if (x['id'] == new_group['id']):

                found = True

        self.assertTrue(found)

**** CubicPower OpenStack Study ****

    def test_add_user_to_group_404(self):

        domain = self._get_domain_fixture()

        new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',

                    'password': uuid.uuid4().hex, 'enabled': True,

                    'domain_id': domain['id']}

        self.identity_api.create_user(new_user['id'], new_user)

        self.assertRaises(exception.GroupNotFound,

                          self.identity_api.add_user_to_group,

                          new_user['id'],

                          uuid.uuid4().hex)

        new_group = {'id': uuid.uuid4().hex, 'domain_id': domain['id'],

                     'name': uuid.uuid4().hex}

        self.identity_api.create_group(new_group['id'], new_group)

        self.assertRaises(exception.UserNotFound,

                          self.identity_api.add_user_to_group,

                          uuid.uuid4().hex,

                          new_group['id'])

**** CubicPower OpenStack Study ****

    def test_check_user_in_group(self):

        domain = self._get_domain_fixture()

        new_group = {'id': uuid.uuid4().hex, 'domain_id': domain['id'],

                     'name': uuid.uuid4().hex}

        self.identity_api.create_group(new_group['id'], new_group)

        new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',

                    'password': uuid.uuid4().hex, 'enabled': True,

                    'domain_id': domain['id']}

        self.identity_api.create_user(new_user['id'], new_user)

        self.identity_api.add_user_to_group(new_user['id'],

                                            new_group['id'])

        self.identity_api.check_user_in_group(new_user['id'], new_group['id'])

**** CubicPower OpenStack Study ****

    def test_create_invalid_domain_fails(self):

        new_group = {'id': uuid.uuid4().hex, 'domain_id': "doesnotexist",

                     'name': uuid.uuid4().hex}

        self.assertRaises(exception.DomainNotFound,

                          self.identity_api.create_group,

                          new_group['id'],

                          new_group)

        new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',

                    'password': uuid.uuid4().hex, 'enabled': True,

                    'domain_id': "doesnotexist"}

        self.assertRaises(exception.DomainNotFound,

                          self.identity_api.create_user,

                          new_user['id'], new_user)

**** CubicPower OpenStack Study ****

    def test_check_user_not_in_group(self):

        new_group = {

            'id': uuid.uuid4().hex,

            'domain_id': DEFAULT_DOMAIN_ID,

            'name': uuid.uuid4().hex}

        self.identity_api.create_group(new_group['id'], new_group)

        self.assertRaises(exception.UserNotFound,

                          self.identity_api.check_user_in_group,

                          uuid.uuid4().hex,

                          new_group['id'])

        new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',

                    'password': uuid.uuid4().hex, 'enabled': True,

                    'domain_id': DEFAULT_DOMAIN_ID}

        self.identity_api.create_user(new_user['id'], new_user)

        self.assertRaises(exception.NotFound,

                          self.identity_api.check_user_in_group,

                          new_user['id'],

                          new_group['id'])

**** CubicPower OpenStack Study ****

    def test_list_users_in_group(self):

        domain = self._get_domain_fixture()

        new_group = {'id': uuid.uuid4().hex, 'domain_id': domain['id'],

                     'name': uuid.uuid4().hex}

        self.identity_api.create_group(new_group['id'], new_group)

        # Make sure we get an empty list back on a new group, not an error.

        user_refs = self.identity_api.list_users_in_group(new_group['id'])

        self.assertEqual(user_refs, [])

        # Make sure we get the correct users back once they have been added

        # to the group.

        new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',

                    'password': uuid.uuid4().hex, 'enabled': True,

                    'domain_id': domain['id']}

        self.identity_api.create_user(new_user['id'], new_user)

        self.identity_api.add_user_to_group(new_user['id'],

                                            new_group['id'])

        user_refs = self.identity_api.list_users_in_group(new_group['id'])

        found = False

        for x in user_refs:

            if (x['id'] == new_user['id']):

                found = True

            self.assertNotIn('password', x)

        self.assertTrue(found)

**** CubicPower OpenStack Study ****

    def test_list_groups_for_user(self):

        domain = self._get_domain_fixture()

        test_groups = []

        test_users = []

        GROUP_COUNT = 3

        USER_COUNT = 2

        for x in range(0, USER_COUNT):

            new_user = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                        'password': uuid.uuid4().hex, 'enabled': True,

                        'domain_id': domain['id']}

            test_users.append(new_user)

            self.identity_api.create_user(new_user['id'], new_user)

        positive_user = test_users[0]

        negative_user = test_users[1]

        for x in range(0, USER_COUNT):

            group_refs = self.identity_api.list_groups_for_user(

                test_users[x]['id'])

            self.assertEqual(len(group_refs), 0)

        for x in range(0, GROUP_COUNT):

            before_count = x

            after_count = x + 1

            new_group = {'id': uuid.uuid4().hex,

                         'domain_id': domain['id'],

                         'name': uuid.uuid4().hex}

            self.identity_api.create_group(new_group['id'], new_group)

            test_groups.append(new_group)

            #add the user to the group and ensure that the

            #group count increases by one for each

            group_refs = self.identity_api.list_groups_for_user(

                positive_user['id'])

            self.assertEqual(len(group_refs), before_count)

            self.identity_api.add_user_to_group(

                positive_user['id'],

                new_group['id'])

            group_refs = self.identity_api.list_groups_for_user(

                positive_user['id'])

            self.assertEqual(len(group_refs), after_count)

            #Make sure the group count for the unrelated user

            #did not change

            group_refs = self.identity_api.list_groups_for_user(

                negative_user['id'])

            self.assertEqual(len(group_refs), 0)

        #remove the user from each group and ensure that

        #the group count reduces by one for each

        for x in range(0, 3):

            before_count = GROUP_COUNT - x

            after_count = GROUP_COUNT - x - 1

            group_refs = self.identity_api.list_groups_for_user(

                positive_user['id'])

            self.assertEqual(len(group_refs), before_count)

            self.identity_api.remove_user_from_group(

                positive_user['id'],

                test_groups[x]['id'])

            group_refs = self.identity_api.list_groups_for_user(

                positive_user['id'])

            self.assertEqual(len(group_refs), after_count)

            #Make sure the group count for the unrelated user

            #did not change

            group_refs = self.identity_api.list_groups_for_user(

                negative_user['id'])

            self.assertEqual(len(group_refs), 0)

**** CubicPower OpenStack Study ****

    def test_remove_user_from_group(self):

        domain = self._get_domain_fixture()

        new_group = {'id': uuid.uuid4().hex, 'domain_id': domain['id'],

                     'name': uuid.uuid4().hex}

        self.identity_api.create_group(new_group['id'], new_group)

        new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',

                    'password': uuid.uuid4().hex, 'enabled': True,

                    'domain_id': domain['id']}

        self.identity_api.create_user(new_user['id'], new_user)

        self.identity_api.add_user_to_group(new_user['id'],

                                            new_group['id'])

        groups = self.identity_api.list_groups_for_user(new_user['id'])

        self.assertIn(new_group['id'], [x['id'] for x in groups])

        self.identity_api.remove_user_from_group(new_user['id'],

                                                 new_group['id'])

        groups = self.identity_api.list_groups_for_user(new_user['id'])

        self.assertNotIn(new_group['id'], [x['id'] for x in groups])

**** CubicPower OpenStack Study ****

    def test_remove_user_from_group_404(self):

        domain = self._get_domain_fixture()

        new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',

                    'password': uuid.uuid4().hex, 'enabled': True,

                    'domain_id': domain['id']}

        self.identity_api.create_user(new_user['id'], new_user)

        new_group = {'id': uuid.uuid4().hex, 'domain_id': domain['id'],

                     'name': uuid.uuid4().hex}

        self.identity_api.create_group(new_group['id'], new_group)

        self.assertRaises(exception.NotFound,

                          self.identity_api.remove_user_from_group,

                          new_user['id'],

                          uuid.uuid4().hex)

        self.assertRaises(exception.NotFound,

                          self.identity_api.remove_user_from_group,

                          uuid.uuid4().hex,

                          new_group['id'])

        self.assertRaises(exception.NotFound,

                          self.identity_api.remove_user_from_group,

                          uuid.uuid4().hex,

                          uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_group_crud(self):

        domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain['id'], domain)

        group = {'id': uuid.uuid4().hex, 'domain_id': domain['id'],

                 'name': uuid.uuid4().hex}

        self.identity_api.create_group(group['id'], group)

        group_ref = self.identity_api.get_group(group['id'])

        self.assertDictContainsSubset(group, group_ref)

        group['name'] = uuid.uuid4().hex

        self.identity_api.update_group(group['id'], group)

        group_ref = self.identity_api.get_group(group['id'])

        self.assertDictContainsSubset(group, group_ref)

        self.identity_api.delete_group(group['id'])

        self.assertRaises(exception.GroupNotFound,

                          self.identity_api.get_group,

                          group['id'])

**** CubicPower OpenStack Study ****

    def test_create_duplicate_group_name_fails(self):

        group1 = {'id': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID,

                  'name': uuid.uuid4().hex}

        group2 = {'id': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID,

                  'name': group1['name']}

        self.identity_api.create_group(group1['id'], group1)

        self.assertRaises(exception.Conflict,

                          self.identity_api.create_group,

                          group2['id'], group2)

**** CubicPower OpenStack Study ****

    def test_create_duplicate_group_name_in_different_domains(self):

        new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(new_domain['id'], new_domain)

        group1 = {'id': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID,

                  'name': uuid.uuid4().hex}

        group2 = {'id': uuid.uuid4().hex, 'domain_id': new_domain['id'],

                  'name': group1['name']}

        self.identity_api.create_group(group1['id'], group1)

        self.identity_api.create_group(group2['id'], group2)

**** CubicPower OpenStack Study ****

    def test_move_group_between_domains(self):

        domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain1['id'], domain1)

        domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain2['id'], domain2)

        group = {'id': uuid.uuid4().hex,

                 'name': uuid.uuid4().hex,

                 'domain_id': domain1['id']}

        self.identity_api.create_group(group['id'], group)

        group['domain_id'] = domain2['id']

        self.identity_api.update_group(group['id'], group)

**** CubicPower OpenStack Study ****

    def test_move_group_between_domains_with_clashing_names_fails(self):

        domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain1['id'], domain1)

        domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain2['id'], domain2)

        # First, create a group in domain1

        group1 = {'id': uuid.uuid4().hex,

                  'name': uuid.uuid4().hex,

                  'domain_id': domain1['id']}

        self.identity_api.create_group(group1['id'], group1)

        # Now create a group in domain2 with a potentially clashing

        # name - which should work since we have domain separation

        group2 = {'id': uuid.uuid4().hex,

                  'name': group1['name'],

                  'domain_id': domain2['id']}

        self.identity_api.create_group(group2['id'], group2)

        # Now try and move group1 into the 2nd domain - which should

        # fail since the names clash

        group1['domain_id'] = domain2['id']

        self.assertRaises(exception.Conflict,

                          self.identity_api.update_group,

                          group1['id'],

                          group1)

**** CubicPower OpenStack Study ****

    def test_project_crud(self):

        domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                  'enabled': True}

        self.assignment_api.create_domain(domain['id'], domain)

        project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                   'domain_id': domain['id']}

        self.assignment_api.create_project(project['id'], project)

        project_ref = self.assignment_api.get_project(project['id'])

        self.assertDictContainsSubset(project, project_ref)

        project['name'] = uuid.uuid4().hex

        self.assignment_api.update_project(project['id'], project)

        project_ref = self.assignment_api.get_project(project['id'])

        self.assertDictContainsSubset(project, project_ref)

        self.assignment_api.delete_project(project['id'])

        self.assertRaises(exception.ProjectNotFound,

                          self.assignment_api.get_project,

                          project['id'])

**** CubicPower OpenStack Study ****

    def test_project_update_missing_attrs_with_a_value(self):

        # Creating a project with no description attribute.

        project = {'id': uuid.uuid4().hex,

                   'name': uuid.uuid4().hex,

                   'domain_id': DEFAULT_DOMAIN_ID,

                   'enabled': True}

        self.assignment_api.create_project(project['id'], project)

        # Add a description attribute.

        project['description'] = uuid.uuid4().hex

        self.assignment_api.update_project(project['id'], project)

        project_ref = self.assignment_api.get_project(project['id'])

        self.assertDictEqual(project_ref, project)

**** CubicPower OpenStack Study ****

    def test_project_update_missing_attrs_with_a_falsey_value(self):

        # Creating a project with no description attribute.

        project = {'id': uuid.uuid4().hex,

                   'name': uuid.uuid4().hex,

                   'domain_id': DEFAULT_DOMAIN_ID,

                   'enabled': True}

        self.assignment_api.create_project(project['id'], project)

        # Add a description attribute.

        project['description'] = ''

        self.assignment_api.update_project(project['id'], project)

        project_ref = self.assignment_api.get_project(project['id'])

        self.assertDictEqual(project_ref, project)

**** CubicPower OpenStack Study ****

    def test_domain_crud(self):

        domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                  'enabled': True}

        self.assignment_api.create_domain(domain['id'], domain)

        domain_ref = self.assignment_api.get_domain(domain['id'])

        self.assertDictEqual(domain_ref, domain)

        domain['name'] = uuid.uuid4().hex

        self.assignment_api.update_domain(domain['id'], domain)

        domain_ref = self.assignment_api.get_domain(domain['id'])

        self.assertDictEqual(domain_ref, domain)

        # Ensure an 'enabled' domain cannot be deleted

        self.assertRaises(exception.ForbiddenAction,

                          self.assignment_api.delete_domain,

                          domain_id=domain['id'])

        # Disable the domain

        domain['enabled'] = False

        self.assignment_api.update_domain(domain['id'], domain)

        # Delete the domain

        self.assignment_api.delete_domain(domain['id'])

        # Make sure the domain no longer exists

        self.assertRaises(exception.DomainNotFound,

                          self.assignment_api.get_domain,

                          domain['id'])

**** CubicPower OpenStack Study ****

    def test_create_domain_case_sensitivity(self):

        # create a ref with a lowercase name

        ref = {

            'id': uuid.uuid4().hex,

            'name': uuid.uuid4().hex.lower()}

        self.assignment_api.create_domain(ref['id'], ref)

        # assign a new ID with the same name, but this time in uppercase

        ref['id'] = uuid.uuid4().hex

        ref['name'] = ref['name'].upper()

        self.assignment_api.create_domain(ref['id'], ref)

**** CubicPower OpenStack Study ****

    def test_attribute_update(self):

        project = {

            'domain_id': DEFAULT_DOMAIN_ID,

            'id': uuid.uuid4().hex,

            'name': uuid.uuid4().hex}

        self.assignment_api.create_project(project['id'], project)

        # pick a key known to be non-existent

        key = 'description'

        def assert_key_equals(value):

            project_ref = self.assignment_api.update_project(

                project['id'], project)

            self.assertEqual(project_ref[key], value)

            project_ref = self.assignment_api.get_project(project['id'])

            self.assertEqual(project_ref[key], value)

        def assert_get_key_is(value):

            project_ref = self.assignment_api.update_project(

                project['id'], project)

            self.assertIs(project_ref.get(key), value)

            project_ref = self.assignment_api.get_project(project['id'])

            self.assertIs(project_ref.get(key), value)

        # add an attribute that doesn't exist, set it to a falsey value

        value = ''

        project[key] = value

        assert_key_equals(value)

        # set an attribute with a falsey value to null

        value = None

        project[key] = value

        assert_get_key_is(value)

        # do it again, in case updating from this situation is handled oddly

        value = None

        project[key] = value

        assert_get_key_is(value)

        # set a possibly-null value to a falsey value

        value = ''

        project[key] = value

        assert_key_equals(value)

        # set a falsey value to a truthy value

        value = uuid.uuid4().hex

        project[key] = value

        assert_key_equals(value)

**** CubicPower OpenStack Study ****

        def assert_key_equals(value):

            project_ref = self.assignment_api.update_project(

                project['id'], project)

            self.assertEqual(project_ref[key], value)

            project_ref = self.assignment_api.get_project(project['id'])

            self.assertEqual(project_ref[key], value)

**** CubicPower OpenStack Study ****

        def assert_get_key_is(value):

            project_ref = self.assignment_api.update_project(

                project['id'], project)

            self.assertIs(project_ref.get(key), value)

            project_ref = self.assignment_api.get_project(project['id'])

            self.assertIs(project_ref.get(key), value)

        # add an attribute that doesn't exist, set it to a falsey value

        value = ''

        project[key] = value

        assert_key_equals(value)

        # set an attribute with a falsey value to null

        value = None

        project[key] = value

        assert_get_key_is(value)

        # do it again, in case updating from this situation is handled oddly

        value = None

        project[key] = value

        assert_get_key_is(value)

        # set a possibly-null value to a falsey value

        value = ''

        project[key] = value

        assert_key_equals(value)

        # set a falsey value to a truthy value

        value = uuid.uuid4().hex

        project[key] = value

        assert_key_equals(value)

**** CubicPower OpenStack Study ****

    def test_user_crud(self):

        user = {'domain_id': DEFAULT_DOMAIN_ID,

                'id': uuid.uuid4().hex,

                'name': uuid.uuid4().hex, 'password': 'passw0rd'}

        self.identity_api.create_user(user['id'], user)

        user_ref = self.identity_api.get_user(user['id'])

        del user['password']

        user_ref_dict = dict((x, user_ref[x]) for x in user_ref)

        self.assertDictContainsSubset(user, user_ref_dict)

        user['password'] = uuid.uuid4().hex

        self.identity_api.update_user(user['id'], user)

        user_ref = self.identity_api.get_user(user['id'])

        del user['password']

        user_ref_dict = dict((x, user_ref[x]) for x in user_ref)

        self.assertDictContainsSubset(user, user_ref_dict)

        self.identity_api.delete_user(user['id'])

        self.assertRaises(exception.UserNotFound,

                          self.identity_api.get_user,

                          user['id'])

**** CubicPower OpenStack Study ****

    def test_list_projects_for_user(self):

        domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain['id'], domain)

        user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                 'password': uuid.uuid4().hex, 'domain_id': domain['id'],

                 'enabled': True}

        self.identity_api.create_user(user1['id'], user1)

        user_projects = self.assignment_api.list_projects_for_user(user1['id'])

        self.assertEqual(len(user_projects), 0)

        self.assignment_api.create_grant(user_id=user1['id'],

                                         project_id=self.tenant_bar['id'],

                                         role_id=self.role_member['id'])

        self.assignment_api.create_grant(user_id=user1['id'],

                                         project_id=self.tenant_baz['id'],

                                         role_id=self.role_member['id'])

        user_projects = self.assignment_api.list_projects_for_user(user1['id'])

        self.assertEqual(len(user_projects), 2)

**** CubicPower OpenStack Study ****

    def test_list_projects_for_user_with_grants(self):

        # Create two groups each with a role on a different project, and

        # make user1 a member of both groups.  Both these new projects

        # should now be included, along with any direct user grants.

        domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain['id'], domain)

        user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                 'password': uuid.uuid4().hex, 'domain_id': domain['id'],

                 'enabled': True}

        self.identity_api.create_user(user1['id'], user1)

        group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                  'domain_id': domain['id']}

        self.identity_api.create_group(group1['id'], group1)

        group2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                  'domain_id': domain['id']}

        self.identity_api.create_group(group2['id'], group2)

        project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                    'domain_id': domain['id']}

        self.assignment_api.create_project(project1['id'], project1)

        project2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                    'domain_id': domain['id']}

        self.assignment_api.create_project(project2['id'], project2)

        self.identity_api.add_user_to_group(user1['id'], group1['id'])

        self.identity_api.add_user_to_group(user1['id'], group2['id'])

        # Create 3 grants, one user grant, the other two as group grants

        self.assignment_api.create_grant(user_id=user1['id'],

                                         project_id=self.tenant_bar['id'],

                                         role_id=self.role_member['id'])

        self.assignment_api.create_grant(group_id=group1['id'],

                                         project_id=project1['id'],

                                         role_id=self.role_admin['id'])

        self.assignment_api.create_grant(group_id=group2['id'],

                                         project_id=project2['id'],

                                         role_id=self.role_admin['id'])

        user_projects = self.assignment_api.list_projects_for_user(user1['id'])

        self.assertEqual(len(user_projects), 3)

    @tests.skip_if_cache_disabled('assignment')

**** CubicPower OpenStack Study ****

    def test_cache_layer_domain_crud(self):

        domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                  'enabled': True}

        domain_id = domain['id']

        # Create Domain

        self.assignment_api.create_domain(domain_id, domain)

        domain_ref = self.assignment_api.get_domain(domain_id)

        updated_domain_ref = copy.deepcopy(domain_ref)

        updated_domain_ref['name'] = uuid.uuid4().hex

        # Update domain, bypassing assignment api manager

        self.assignment_api.driver.update_domain(domain_id, updated_domain_ref)

        # Verify get_domain still returns the domain

        self.assertDictContainsSubset(

            domain_ref, self.assignment_api.get_domain(domain_id))

        # Invalidate cache

        self.assignment_api.get_domain.invalidate(self.assignment_api,

                                                  domain_id)

        # Verify get_domain returns the updated domain

        self.assertDictContainsSubset(

            updated_domain_ref, self.assignment_api.get_domain(domain_id))

        # Update the domain back to original ref, using the assignment api

        # manager

        self.assignment_api.update_domain(domain_id, domain_ref)

        self.assertDictContainsSubset(

            domain_ref, self.assignment_api.get_domain(domain_id))

        # Make sure domain is 'disabled', bypass assignment api manager

        domain_ref_disabled = domain_ref.copy()

        domain_ref_disabled['enabled'] = False

        self.assignment_api.driver.update_domain(domain_id,

                                                 domain_ref_disabled)

        # Delete domain, bypassing assignment api manager

        self.assignment_api.driver.delete_domain(domain_id)

        # Verify get_domain still returns the domain

        self.assertDictContainsSubset(

            domain_ref, self.assignment_api.get_domain(domain_id))

        # Invalidate cache

        self.assignment_api.get_domain.invalidate(self.assignment_api,

                                                  domain_id)

        # Verify get_domain now raises DomainNotFound

        self.assertRaises(exception.DomainNotFound,

                          self.assignment_api.get_domain, domain_id)

        # Recreate Domain

        self.assignment_api.create_domain(domain_id, domain)

        self.assignment_api.get_domain(domain_id)

        # Make sure domain is 'disabled', bypass assignment api manager

        domain['enabled'] = False

        self.assignment_api.driver.update_domain(domain_id, domain)

        # Delete domain

        self.assignment_api.delete_domain(domain_id)

        # verify DomainNotFound raised

        self.assertRaises(exception.DomainNotFound,

                          self.assignment_api.get_domain,

                          domain_id)

    @tests.skip_if_cache_disabled('assignment')

**** CubicPower OpenStack Study ****

    def test_cache_layer_project_crud(self):

        domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                  'enabled': True}

        project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                   'domain_id': domain['id']}

        project_id = project['id']

        self.assignment_api.create_domain(domain['id'], domain)

        # Create a project

        self.assignment_api.create_project(project_id, project)

        self.assignment_api.get_project(project_id)

        updated_project = copy.deepcopy(project)

        updated_project['name'] = uuid.uuid4().hex

        # Update project, bypassing assignment_api manager

        self.assignment_api.driver.update_project(project_id,

                                                  updated_project)

        # Verify get_project still returns the original project_ref

        self.assertDictContainsSubset(

            project, self.assignment_api.get_project(project_id))

        # Invalidate cache

        self.assignment_api.get_project.invalidate(self.assignment_api,

                                                   project_id)

        # Verify get_project now returns the new project

        self.assertDictContainsSubset(

            updated_project,

            self.assignment_api.get_project(project_id))

        # Update project using the assignment_api manager back to original

        self.assignment_api.update_project(project['id'], project)

        # Verify get_project returns the original project_ref

        self.assertDictContainsSubset(

            project, self.assignment_api.get_project(project_id))

        # Delete project bypassing assignment_api

        self.assignment_api.driver.delete_project(project_id)

        # Verify get_project still returns the project_ref

        self.assertDictContainsSubset(

            project, self.assignment_api.get_project(project_id))

        # Invalidate cache

        self.assignment_api.get_project.invalidate(self.assignment_api,

                                                   project_id)

        # Verify ProjectNotFound now raised

        self.assertRaises(exception.ProjectNotFound,

                          self.assignment_api.get_project,

                          project_id)

        # recreate project

        self.assignment_api.create_project(project_id, project)

        self.assignment_api.get_project(project_id)

        # delete project

        self.assignment_api.delete_project(project_id)

        # Verify ProjectNotFound is raised

        self.assertRaises(exception.ProjectNotFound,

                          self.assignment_api.get_project,

                          project_id)

    @tests.skip_if_cache_disabled('assignment')

**** CubicPower OpenStack Study ****

    def test_cache_layer_role_crud(self):

        role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        role_id = role['id']

        # Create role

        self.assignment_api.create_role(role_id, role)

        role_ref = self.assignment_api.get_role(role_id)

        updated_role_ref = copy.deepcopy(role_ref)

        updated_role_ref['name'] = uuid.uuid4().hex

        # Update role, bypassing the assignment api manager

        self.assignment_api.driver.update_role(role_id, updated_role_ref)

        # Verify get_role still returns old ref

        self.assertDictEqual(role_ref, self.assignment_api.get_role(role_id))

        # Invalidate Cache

        self.assignment_api.get_role.invalidate(self.assignment_api,

                                                role_id)

        # Verify get_role returns the new role_ref

        self.assertDictEqual(updated_role_ref,

                             self.assignment_api.get_role(role_id))

        # Update role back to original via the assignment api manager

        self.assignment_api.update_role(role_id, role_ref)

        # Verify get_role returns the original role ref

        self.assertDictEqual(role_ref, self.assignment_api.get_role(role_id))

        # Delete role bypassing the assignment api manager

        self.assignment_api.driver.delete_role(role_id)

        # Verify get_role still returns the role_ref

        self.assertDictEqual(role_ref, self.assignment_api.get_role(role_id))

        # Invalidate cache

        self.assignment_api.get_role.invalidate(self.assignment_api, role_id)

        # Verify RoleNotFound is now raised

        self.assertRaises(exception.RoleNotFound,

                          self.assignment_api.get_role,

                          role_id)

        # recreate role

        self.assignment_api.create_role(role_id, role)

        self.assignment_api.get_role(role_id)

        # delete role via the assignment api manager

        self.assignment_api.delete_role(role_id)

        # verity RoleNotFound is now raised

        self.assertRaises(exception.RoleNotFound,

                          self.assignment_api.get_role,

                          role_id)

**** CubicPower OpenStack Study ****

    def create_user_dict(self, **attributes):

        user_dict = {'id': uuid.uuid4().hex,

                     'name': uuid.uuid4().hex,

                     'domain_id': DEFAULT_DOMAIN_ID,

                     'enabled': True}

        user_dict.update(attributes)

        return user_dict

**** CubicPower OpenStack Study ****

    def test_arbitrary_attributes_are_returned_from_create_user(self):

        attr_value = uuid.uuid4().hex

        user_data = self.create_user_dict(arbitrary_attr=attr_value)

        user = self.identity_api.create_user(user_data['id'], user_data)

        self.assertEqual(user['arbitrary_attr'], attr_value)

**** CubicPower OpenStack Study ****

    def test_arbitrary_attributes_are_returned_from_get_user(self):

        attr_value = uuid.uuid4().hex

        user_data = self.create_user_dict(arbitrary_attr=attr_value)

        self.identity_api.create_user(user_data['id'], user_data)

        user = self.identity_api.get_user(user_data['id'])

        self.assertEqual(user['arbitrary_attr'], attr_value)

**** CubicPower OpenStack Study ****

    def test_new_arbitrary_attributes_are_returned_from_update_user(self):

        user_data = self.create_user_dict()

        user = self.identity_api.create_user(user_data['id'], user_data)

        attr_value = uuid.uuid4().hex

        user['arbitrary_attr'] = attr_value

        updated_user = self.identity_api.update_user(user['id'], user)

        self.assertEqual(updated_user['arbitrary_attr'], attr_value)

**** CubicPower OpenStack Study ****

    def test_updated_arbitrary_attributes_are_returned_from_update_user(self):

        attr_value = uuid.uuid4().hex

        user_data = self.create_user_dict(arbitrary_attr=attr_value)

        new_attr_value = uuid.uuid4().hex

        user = self.identity_api.create_user(user_data['id'], user_data)

        user['arbitrary_attr'] = new_attr_value

        updated_user = self.identity_api.update_user(user['id'], user)

        self.assertEqual(updated_user['arbitrary_attr'], new_attr_value)

**** CubicPower OpenStack Study ****

    def test_create_grant_no_user(self):

        # If call create_grant with a user that doesn't exist, doesn't fail.

        self.assignment_api.create_grant(

            self.role_other['id'],

            user_id=uuid.uuid4().hex,

            project_id=self.tenant_bar['id'])

**** CubicPower OpenStack Study ****

    def test_create_grant_no_group(self):

        # If call create_grant with a group that doesn't exist, doesn't fail.

        self.assignment_api.create_grant(

            self.role_other['id'],

            group_id=uuid.uuid4().hex,

            project_id=self.tenant_bar['id'])

**** CubicPower OpenStack Study ****

    def test_get_default_domain_by_name(self):

        domain_name = 'default'

        domain = {'id': uuid.uuid4().hex, 'name': domain_name, 'enabled': True}

        self.assignment_api.create_domain(domain['id'], domain)

        domain_ref = self.assignment_api.get_domain_by_name(domain_name)

        self.assertEqual(domain_ref, domain)

**** CubicPower OpenStack Study ****

    def test_get_not_default_domain_by_name(self):

        domain_name = 'foo'

        self.assertRaises(exception.DomainNotFound,

                          self.assignment_api.get_domain_by_name,

                          domain_name)

**** CubicPower OpenStack Study ****

class TokenTests(object):

**** CubicPower OpenStack Study ****

    def _create_token_id(self):

        # Token must start with MII here otherwise it fails the asn1 test

        # and is not hashed in a SQL backend.

        token_id = "MII"

        for i in range(1, 20):

            token_id += uuid.uuid4().hex

        return token_id

**** CubicPower OpenStack Study ****

    def test_token_crud(self):

        token_id = self._create_token_id()

        data = {'id': token_id, 'a': 'b',

                'trust_id': None,

                'user': {'id': 'testuserid'}}

        data_ref = self.token_api.create_token(token_id, data)

        expires = data_ref.pop('expires')

        data_ref.pop('user_id')

        self.assertIsInstance(expires, datetime.datetime)

        data_ref.pop('id')

        data.pop('id')

        self.assertDictEqual(data_ref, data)

        new_data_ref = self.token_api.get_token(token_id)

        expires = new_data_ref.pop('expires')

        self.assertIsInstance(expires, datetime.datetime)

        new_data_ref.pop('user_id')

        new_data_ref.pop('id')

        self.assertEqual(new_data_ref, data)

        self.token_api.delete_token(token_id)

        self.assertRaises(exception.TokenNotFound,

                          self.token_api.get_token, token_id)

        self.assertRaises(exception.TokenNotFound,

                          self.token_api.delete_token, token_id)

**** CubicPower OpenStack Study ****

    def create_token_sample_data(self, token_id=None, tenant_id=None,

                                 trust_id=None, user_id=None, expires=None):

        if token_id is None:

            token_id = self._create_token_id()

        if user_id is None:

            user_id = 'testuserid'

        # FIXME(morganfainberg): These tokens look nothing like "Real" tokens.

        # This should be updated when token_api is updated to merge in the

        # issue_token logic from the providers (token issuance should be a

        # pipeline).  The fix should be in implementation of blueprint:

        # token-issuance-pipeline

        data = {'id': token_id, 'a': 'b',

                'user': {'id': user_id}}

        if tenant_id is not None:

            data['tenant'] = {'id': tenant_id, 'name': tenant_id}

        if tenant_id is NULL_OBJECT:

            data['tenant'] = None

        if expires is not None:

            data['expires'] = expires

        if trust_id is not None:

            data['trust_id'] = trust_id

            data.setdefault('access', {}).setdefault('trust', {})

            # Testuserid2 is used here since a trustee will be different in

            # the cases of impersonation and therefore should not match the

            # token's user_id.

            data['access']['trust']['trustee_user_id'] = 'testuserid2'

        data['token_version'] = provider.V2

        # Issue token stores a copy of all token data at token['token_data'].

        # This emulates that assumption as part of the test.

        data['token_data'] = copy.deepcopy(data)

        new_token = self.token_api.create_token(token_id, data)

        return new_token['id'], data

**** CubicPower OpenStack Study ****

    def test_delete_tokens(self):

        tokens = self.token_api._list_tokens('testuserid')

        self.assertEqual(len(tokens), 0)

        token_id1, data = self.create_token_sample_data(

            tenant_id='testtenantid')

        token_id2, data = self.create_token_sample_data(

            tenant_id='testtenantid')

        token_id3, data = self.create_token_sample_data(

            tenant_id='testtenantid',

            user_id='testuserid1')

        tokens = self.token_api._list_tokens('testuserid')

        self.assertEqual(len(tokens), 2)

        self.assertIn(token_id2, tokens)

        self.assertIn(token_id1, tokens)

        self.token_api.delete_tokens(user_id='testuserid',

                                     tenant_id='testtenantid')

        tokens = self.token_api._list_tokens('testuserid')

        self.assertEqual(len(tokens), 0)

        self.assertRaises(exception.TokenNotFound,

                          self.token_api.get_token, token_id1)

        self.assertRaises(exception.TokenNotFound,

                          self.token_api.get_token, token_id2)

        self.token_api.get_token(token_id3)

**** CubicPower OpenStack Study ****

    def test_delete_tokens_trust(self):

        tokens = self.token_api._list_tokens(user_id='testuserid')

        self.assertEqual(len(tokens), 0)

        token_id1, data = self.create_token_sample_data(

            tenant_id='testtenantid',

            trust_id='testtrustid')

        token_id2, data = self.create_token_sample_data(

            tenant_id='testtenantid',

            user_id='testuserid1',

            trust_id='testtrustid1')

        tokens = self.token_api._list_tokens('testuserid')

        self.assertEqual(len(tokens), 1)

        self.assertIn(token_id1, tokens)

        self.token_api.delete_tokens(user_id='testuserid',

                                     tenant_id='testtenantid',

                                     trust_id='testtrustid')

        self.assertRaises(exception.TokenNotFound,

                          self.token_api.get_token, token_id1)

        self.token_api.get_token(token_id2)

**** CubicPower OpenStack Study ****

    def _test_token_list(self, token_list_fn):

        tokens = token_list_fn('testuserid')

        self.assertEqual(len(tokens), 0)

        token_id1, data = self.create_token_sample_data()

        tokens = token_list_fn('testuserid')

        self.assertEqual(len(tokens), 1)

        self.assertIn(token_id1, tokens)

        token_id2, data = self.create_token_sample_data()

        tokens = token_list_fn('testuserid')

        self.assertEqual(len(tokens), 2)

        self.assertIn(token_id2, tokens)

        self.assertIn(token_id1, tokens)

        self.token_api.delete_token(token_id1)

        tokens = token_list_fn('testuserid')

        self.assertIn(token_id2, tokens)

        self.assertNotIn(token_id1, tokens)

        self.token_api.delete_token(token_id2)

        tokens = token_list_fn('testuserid')

        self.assertNotIn(token_id2, tokens)

        self.assertNotIn(token_id1, tokens)

        # tenant-specific tokens

        tenant1 = uuid.uuid4().hex

        tenant2 = uuid.uuid4().hex

        token_id3, data = self.create_token_sample_data(tenant_id=tenant1)

        token_id4, data = self.create_token_sample_data(tenant_id=tenant2)

        # test for existing but empty tenant (LP:1078497)

        token_id5, data = self.create_token_sample_data(tenant_id=NULL_OBJECT)

        tokens = token_list_fn('testuserid')

        self.assertEqual(len(tokens), 3)

        self.assertNotIn(token_id1, tokens)

        self.assertNotIn(token_id2, tokens)

        self.assertIn(token_id3, tokens)

        self.assertIn(token_id4, tokens)

        self.assertIn(token_id5, tokens)

        tokens = token_list_fn('testuserid', tenant2)

        self.assertEqual(len(tokens), 1)

        self.assertNotIn(token_id1, tokens)

        self.assertNotIn(token_id2, tokens)

        self.assertNotIn(token_id3, tokens)

        self.assertIn(token_id4, tokens)

**** CubicPower OpenStack Study ****

    def test_token_list(self):

        self._test_token_list(self.token_api._list_tokens)

**** CubicPower OpenStack Study ****

    def test_token_list_deprecated_public_interface(self):

        # TODO(morganfainberg): Remove once token_api.list_tokens is removed

        # (post Icehouse release)

        self._test_token_list(self.token_api.list_tokens)

**** CubicPower OpenStack Study ****

    def test_token_list_trust(self):

        trust_id = uuid.uuid4().hex

        token_id5, data = self.create_token_sample_data(trust_id=trust_id)

        tokens = self.token_api._list_tokens('testuserid', trust_id=trust_id)

        self.assertEqual(len(tokens), 1)

        self.assertIn(token_id5, tokens)

**** CubicPower OpenStack Study ****

    def test_get_token_404(self):

        self.assertRaises(exception.TokenNotFound,

                          self.token_api.get_token,

                          uuid.uuid4().hex)

        self.assertRaises(exception.TokenNotFound,

                          self.token_api.get_token,

                          None)

**** CubicPower OpenStack Study ****

    def test_delete_token_404(self):

        self.assertRaises(exception.TokenNotFound,

                          self.token_api.delete_token,

                          uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_expired_token(self):

        token_id = uuid.uuid4().hex

        expire_time = timeutils.utcnow() - datetime.timedelta(minutes=1)

        data = {'id_hash': token_id, 'id': token_id, 'a': 'b',

                'expires': expire_time,

                'trust_id': None,

                'user': {'id': 'testuserid'}}

        data_ref = self.token_api.create_token(token_id, data)

        data_ref.pop('user_id')

        self.assertDictEqual(data_ref, data)

        self.assertRaises(exception.TokenNotFound,

                          self.token_api.get_token, token_id)

**** CubicPower OpenStack Study ****

    def test_null_expires_token(self):

        token_id = uuid.uuid4().hex

        data = {'id': token_id, 'id_hash': token_id, 'a': 'b', 'expires': None,

                'user': {'id': 'testuserid'}}

        data_ref = self.token_api.create_token(token_id, data)

        self.assertIsNotNone(data_ref['expires'])

        new_data_ref = self.token_api.get_token(token_id)

        # MySQL doesn't store microseconds, so discard them before testing

        data_ref['expires'] = data_ref['expires'].replace(microsecond=0)

        new_data_ref['expires'] = new_data_ref['expires'].replace(

            microsecond=0)

        self.assertEqual(data_ref, new_data_ref)

**** CubicPower OpenStack Study ****

    def check_list_revoked_tokens(self, token_ids):

        revoked_ids = [x['id'] for x in self.token_api.list_revoked_tokens()]

        for token_id in token_ids:

            self.assertIn(token_id, revoked_ids)

**** CubicPower OpenStack Study ****

    def delete_token(self):

        token_id = uuid.uuid4().hex

        data = {'id_hash': token_id, 'id': token_id, 'a': 'b',

                'user': {'id': 'testuserid'}}

        data_ref = self.token_api.create_token(token_id, data)

        self.token_api.delete_token(token_id)

        self.assertRaises(

            exception.TokenNotFound,

            self.token_api.get_token,

            data_ref['id'])

        self.assertRaises(

            exception.TokenNotFound,

            self.token_api.delete_token,

            data_ref['id'])

        return token_id

**** CubicPower OpenStack Study ****

    def test_list_revoked_tokens_returns_empty_list(self):

        revoked_ids = [x['id'] for x in self.token_api.list_revoked_tokens()]

        self.assertEqual(revoked_ids, [])

**** CubicPower OpenStack Study ****

    def test_list_revoked_tokens_for_single_token(self):

        self.check_list_revoked_tokens([self.delete_token()])

**** CubicPower OpenStack Study ****

    def test_list_revoked_tokens_for_multiple_tokens(self):

        self.check_list_revoked_tokens([self.delete_token()

                                        for x in six.moves.range(2)])

**** CubicPower OpenStack Study ****

    def test_flush_expired_token(self):

        token_id = uuid.uuid4().hex

        expire_time = timeutils.utcnow() - datetime.timedelta(minutes=1)

        data = {'id_hash': token_id, 'id': token_id, 'a': 'b',

                'expires': expire_time,

                'trust_id': None,

                'user': {'id': 'testuserid'}}

        data_ref = self.token_api.create_token(token_id, data)

        data_ref.pop('user_id')

        self.assertDictEqual(data_ref, data)

        token_id = uuid.uuid4().hex

        expire_time = timeutils.utcnow() + datetime.timedelta(minutes=1)

        data = {'id_hash': token_id, 'id': token_id, 'a': 'b',

                'expires': expire_time,

                'trust_id': None,

                'user': {'id': 'testuserid'}}

        data_ref = self.token_api.create_token(token_id, data)

        data_ref.pop('user_id')

        self.assertDictEqual(data_ref, data)

        self.token_api.flush_expired_tokens()

        tokens = self.token_api._list_tokens('testuserid')

        self.assertEqual(len(tokens), 1)

        self.assertIn(token_id, tokens)

    @tests.skip_if_cache_disabled('token')

**** CubicPower OpenStack Study ****

    def test_revocation_list_cache(self):

        expire_time = timeutils.utcnow() + datetime.timedelta(minutes=10)

        token_id = uuid.uuid4().hex

        token_data = {'id_hash': token_id, 'id': token_id, 'a': 'b',

                      'expires': expire_time,

                      'trust_id': None,

                      'user': {'id': 'testuserid'}}

        token2_id = uuid.uuid4().hex

        token2_data = {'id_hash': token2_id, 'id': token2_id, 'a': 'b',

                       'expires': expire_time,

                       'trust_id': None,

                       'user': {'id': 'testuserid'}}

        # Create 2 Tokens.

        self.token_api.create_token(token_id, token_data)

        self.token_api.create_token(token2_id, token2_data)

        # Verify the revocation list is empty.

        self.assertEqual([], self.token_api.list_revoked_tokens())

        # Delete a token directly, bypassing the manager.

        self.token_api.driver.delete_token(token_id)

        # Verify the revocation list is still empty.

        self.assertEqual([], self.token_api.list_revoked_tokens())

        # Invalidate the revocation list.

        self.token_api.invalidate_revocation_list()

        # Verify the deleted token is in the revocation list.

        revoked_tokens = [x['id']

                          for x in self.token_api.list_revoked_tokens()]

        self.assertIn(token_id, revoked_tokens)

        # Delete the second token, through the manager

        self.token_api.delete_token(token2_id)

        revoked_tokens = [x['id']

                          for x in self.token_api.list_revoked_tokens()]

        # Verify both tokens are in the revocation list.

        self.assertIn(token_id, revoked_tokens)

        self.assertIn(token2_id, revoked_tokens)

**** CubicPower OpenStack Study ****

    def test_predictable_revoked_pki_token_id(self):

        token_id = self._create_token_id()

        token_id_hash = hashlib.md5(token_id).hexdigest()

        token = {'user': {'id': uuid.uuid4().hex}}

        self.token_api.create_token(token_id, token)

        self.token_api.delete_token(token_id)

        revoked_ids = [x['id'] for x in self.token_api.list_revoked_tokens()]

        self.assertIn(token_id_hash, revoked_ids)

        self.assertNotIn(token_id, revoked_ids)

        for t in self.token_api.list_revoked_tokens():

            self.assertIn('expires', t)

**** CubicPower OpenStack Study ****

    def test_predictable_revoked_uuid_token_id(self):

        token_id = uuid.uuid4().hex

        token = {'user': {'id': uuid.uuid4().hex}}

        self.token_api.create_token(token_id, token)

        self.token_api.delete_token(token_id)

        revoked_ids = [x['id'] for x in self.token_api.list_revoked_tokens()]

        self.assertIn(token_id, revoked_ids)

        for t in self.token_api.list_revoked_tokens():

            self.assertIn('expires', t)

**** CubicPower OpenStack Study ****

    def test_create_unicode_token_id(self):

        token_id = six.text_type(self._create_token_id())

        self.create_token_sample_data(token_id=token_id)

        self.token_api.get_token(token_id)

**** CubicPower OpenStack Study ****

    def test_create_unicode_user_id(self):

        user_id = six.text_type(uuid.uuid4().hex)

        token_id, data = self.create_token_sample_data(user_id=user_id)

        self.token_api.get_token(token_id)

**** CubicPower OpenStack Study ****

    def test_list_tokens_unicode_user_id(self):

        user_id = six.text_type(uuid.uuid4().hex)

        self.token_api.list_tokens(user_id)

**** CubicPower OpenStack Study ****

    def test_token_expire_timezone(self):

        @test_utils.timezone

        def _create_token(expire_time):

            token_id = uuid.uuid4().hex

            user_id = six.text_type(uuid.uuid4().hex)

            return self.create_token_sample_data(token_id=token_id,

                                                 user_id=user_id,

                                                 expires=expire_time)

        for d in ['+0', '-11', '-8', '-5', '+5', '+8', '+14']:

            test_utils.TZ = 'UTC' + d

            expire_time = timeutils.utcnow() + datetime.timedelta(minutes=1)

            token_id, data_in = _create_token(expire_time)

            data_get = self.token_api.get_token(token_id)

            self.assertEqual(data_in['id'], data_get['id'],

                             'TZ=%s' % test_utils.TZ)

            expire_time_expired = (

                timeutils.utcnow() + datetime.timedelta(minutes=-1))

            token_id, data_in = _create_token(expire_time_expired)

            self.assertRaises(exception.TokenNotFound,

                              self.token_api.get_token, data_in['id'])

**** CubicPower OpenStack Study ****

        def _create_token(expire_time):

            token_id = uuid.uuid4().hex

            user_id = six.text_type(uuid.uuid4().hex)

            return self.create_token_sample_data(token_id=token_id,

                                                 user_id=user_id,

                                                 expires=expire_time)

        for d in ['+0', '-11', '-8', '-5', '+5', '+8', '+14']:

            test_utils.TZ = 'UTC' + d

            expire_time = timeutils.utcnow() + datetime.timedelta(minutes=1)

            token_id, data_in = _create_token(expire_time)

            data_get = self.token_api.get_token(token_id)

            self.assertEqual(data_in['id'], data_get['id'],

                             'TZ=%s' % test_utils.TZ)

            expire_time_expired = (

                timeutils.utcnow() + datetime.timedelta(minutes=-1))

            token_id, data_in = _create_token(expire_time_expired)

            self.assertRaises(exception.TokenNotFound,

                              self.token_api.get_token, data_in['id'])

**** CubicPower OpenStack Study ****

class TokenCacheInvalidation(object):

**** CubicPower OpenStack Study ****

    def _create_test_data(self):

        self.user = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                     'password': uuid.uuid4().hex,

                     'domain_id': DEFAULT_DOMAIN_ID, 'enabled': True}

        self.tenant = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                       'domain_id': DEFAULT_DOMAIN_ID, 'enabled': True}

        # Create an equivalent of a scoped token

        token_dict = {'user': self.user, 'tenant': self.tenant,

                      'metadata': {}, 'id': 'placeholder'}

        token_id, data = self.token_provider_api.issue_v2_token(token_dict)

        self.scoped_token_id = token_id

        # ..and an un-scoped one

        token_dict = {'user': self.user, 'tenant': None,

                      'metadata': {}, 'id': 'placeholder'}

        token_id, data = self.token_provider_api.issue_v2_token(token_dict)

        self.unscoped_token_id = token_id

        # Validate them, in the various ways possible - this will load the

        # responses into the token cache.

        self._check_scoped_tokens_are_valid()

        self._check_unscoped_tokens_are_valid()

**** CubicPower OpenStack Study ****

    def _check_unscoped_tokens_are_invalid(self):

        self.assertRaises(

            exception.TokenNotFound,

            self.token_provider_api.validate_token,

            self.unscoped_token_id)

        self.assertRaises(

            exception.TokenNotFound,

            self.token_provider_api.validate_v2_token,

            self.unscoped_token_id)

**** CubicPower OpenStack Study ****

    def _check_scoped_tokens_are_invalid(self):

        self.assertRaises(

            exception.TokenNotFound,

            self.token_provider_api.validate_token,

            self.scoped_token_id)

        self.assertRaises(

            exception.TokenNotFound,

            self.token_provider_api.validate_token,

            self.scoped_token_id,

            self.tenant['id'])

        self.assertRaises(

            exception.TokenNotFound,

            self.token_provider_api.validate_v2_token,

            self.scoped_token_id)

        self.assertRaises(

            exception.TokenNotFound,

            self.token_provider_api.validate_v2_token,

            self.scoped_token_id,

            self.tenant['id'])

**** CubicPower OpenStack Study ****

    def _check_scoped_tokens_are_valid(self):

        self.token_provider_api.validate_token(self.scoped_token_id)

        self.token_provider_api.validate_token(

            self.scoped_token_id, belongs_to=self.tenant['id'])

        self.token_provider_api.validate_v2_token(self.scoped_token_id)

        self.token_provider_api.validate_v2_token(

            self.scoped_token_id, belongs_to=self.tenant['id'])

**** CubicPower OpenStack Study ****

    def _check_unscoped_tokens_are_valid(self):

        self.token_provider_api.validate_token(self.unscoped_token_id)

        self.token_provider_api.validate_v2_token(self.unscoped_token_id)

**** CubicPower OpenStack Study ****

    def test_delete_unscoped_token(self):

        self.token_api.delete_token(self.unscoped_token_id)

        self._check_unscoped_tokens_are_invalid()

        self._check_scoped_tokens_are_valid()

**** CubicPower OpenStack Study ****

    def test_delete_scoped_token_by_id(self):

        self.token_api.delete_token(self.scoped_token_id)

        self._check_scoped_tokens_are_invalid()

        self._check_unscoped_tokens_are_valid()

**** CubicPower OpenStack Study ****

    def test_delete_scoped_token_by_user(self):

        self.token_api.delete_tokens(self.user['id'])

        # Since we are deleting all tokens for this user, they should all

        # now be invalid.

        self._check_scoped_tokens_are_invalid()

        self._check_unscoped_tokens_are_invalid()

**** CubicPower OpenStack Study ****

    def test_delete_scoped_token_by_user_and_tenant(self):

        self.token_api.delete_tokens(self.user['id'],

                                     tenant_id=self.tenant['id'])

        self._check_scoped_tokens_are_invalid()

        self._check_unscoped_tokens_are_valid()

**** CubicPower OpenStack Study ****

class TrustTests(object):

**** CubicPower OpenStack Study ****

    def create_sample_trust(self, new_id, remaining_uses=None):

        self.trustor = self.user_foo

        self.trustee = self.user_two

        trust_data = (self.trust_api.create_trust

                      (new_id,

                       {'trustor_user_id': self.trustor['id'],

                        'trustee_user_id': self.user_two['id'],

                        'project_id': self.tenant_bar['id'],

                        'expires_at': timeutils.

                        parse_isotime('2031-02-18T18:10:00Z'),

                        'impersonation': True,

                        'remaining_uses': remaining_uses},

                       roles=[{"id": "member"},

                              {"id": "other"},

                              {"id": "browser"}]))

        return trust_data

**** CubicPower OpenStack Study ****

    def test_delete_trust(self):

        new_id = uuid.uuid4().hex

        trust_data = self.create_sample_trust(new_id)

        trust_id = trust_data['id']

        self.assertIsNotNone(trust_data)

        trust_data = self.trust_api.get_trust(trust_id)

        self.assertEqual(new_id, trust_data['id'])

        self.trust_api.delete_trust(trust_id)

        self.assertIsNone(self.trust_api.get_trust(trust_id))

**** CubicPower OpenStack Study ****

    def test_delete_trust_not_found(self):

        trust_id = uuid.uuid4().hex

        self.assertRaises(exception.TrustNotFound,

                          self.trust_api.delete_trust,

                          trust_id)

**** CubicPower OpenStack Study ****

    def test_get_trust(self):

        new_id = uuid.uuid4().hex

        trust_data = self.create_sample_trust(new_id)

        trust_id = trust_data['id']

        self.assertIsNotNone(trust_data)

        trust_data = self.trust_api.get_trust(trust_id)

        self.assertEqual(new_id, trust_data['id'])

**** CubicPower OpenStack Study ****

    def test_create_trust(self):

        new_id = uuid.uuid4().hex

        trust_data = self.create_sample_trust(new_id)

        self.assertEqual(new_id, trust_data['id'])

        self.assertEqual(self.trustee['id'], trust_data['trustee_user_id'])

        self.assertEqual(self.trustor['id'], trust_data['trustor_user_id'])

        self.assertTrue(timeutils.normalize_time(trust_data['expires_at']) >

                        timeutils.utcnow())

        self.assertEqual([{'id': 'member'},

                          {'id': 'other'},

                          {'id': 'browser'}], trust_data['roles'])

**** CubicPower OpenStack Study ****

    def test_list_trust_by_trustee(self):

        for i in range(3):

            self.create_sample_trust(uuid.uuid4().hex)

        trusts = self.trust_api.list_trusts_for_trustee(self.trustee['id'])

        self.assertEqual(len(trusts), 3)

        self.assertEqual(trusts[0]["trustee_user_id"], self.trustee['id'])

        trusts = self.trust_api.list_trusts_for_trustee(self.trustor['id'])

        self.assertEqual(len(trusts), 0)

**** CubicPower OpenStack Study ****

    def test_list_trust_by_trustor(self):

        for i in range(3):

            self.create_sample_trust(uuid.uuid4().hex)

        trusts = self.trust_api.list_trusts_for_trustor(self.trustor['id'])

        self.assertEqual(len(trusts), 3)

        self.assertEqual(trusts[0]["trustor_user_id"], self.trustor['id'])

        trusts = self.trust_api.list_trusts_for_trustor(self.trustee['id'])

        self.assertEqual(len(trusts), 0)

**** CubicPower OpenStack Study ****

    def test_list_trusts(self):

        for i in range(3):

            self.create_sample_trust(uuid.uuid4().hex)

        trusts = self.trust_api.list_trusts()

        self.assertEqual(len(trusts), 3)

**** CubicPower OpenStack Study ****

    def test_trust_has_remaining_uses_positive(self):

        # create a trust with limited uses, check that we have uses left

        trust_data = self.create_sample_trust(uuid.uuid4().hex,

                                              remaining_uses=5)

        self.assertEqual(5, trust_data['remaining_uses'])

        # create a trust with unlimited uses, check that we have uses left

        trust_data = self.create_sample_trust(uuid.uuid4().hex)

        self.assertIsNone(trust_data['remaining_uses'])

**** CubicPower OpenStack Study ****

    def test_trust_has_remaining_uses_negative(self):

        # try to create a trust with no remaining uses, check that it fails

        self.assertRaises(exception.ValidationError,

                          self.create_sample_trust,

                          uuid.uuid4().hex,

                          remaining_uses=0)

        # try to create a trust with negative remaining uses,

        # check that it fails

        self.assertRaises(exception.ValidationError,

                          self.create_sample_trust,

                          uuid.uuid4().hex,

                          remaining_uses=-12)

**** CubicPower OpenStack Study ****

    def test_consume_use(self):

        # consume a trust repeatedly until it has no uses anymore

        trust_data = self.create_sample_trust(uuid.uuid4().hex,

                                              remaining_uses=2)

        self.trust_api.consume_use(trust_data['id'])

        t = self.trust_api.get_trust(trust_data['id'])

        self.assertEqual(1, t['remaining_uses'])

        self.trust_api.consume_use(trust_data['id'])

        # This was the last use, the trust isn't available anymore

        self.assertIsNone(self.trust_api.get_trust(trust_data['id']))

**** CubicPower OpenStack Study ****

class CommonHelperTests(tests.TestCase):

**** CubicPower OpenStack Study ****

    def test_format_helper_raises_malformed_on_missing_key(self):

        self.assertRaises(exception.MalformedEndpoint,

                          core.format_url,

                          "http://%(foo)s/%(bar)s",

                          {"foo": "1"})

**** CubicPower OpenStack Study ****

    def test_format_helper_raises_malformed_on_wrong_type(self):

        self.assertRaises(exception.MalformedEndpoint,

                          core.format_url,

                          "http://%foo%s",

                          {"foo": "1"})

**** CubicPower OpenStack Study ****

    def test_format_helper_raises_malformed_on_incomplete_format(self):

        self.assertRaises(exception.MalformedEndpoint,

                          core.format_url,

                          "http://%(foo)",

                          {"foo": "1"})

**** CubicPower OpenStack Study ****

class CatalogTests(object):

**** CubicPower OpenStack Study ****

    def test_region_crud(self):

        # create

        region_id = uuid.uuid4().hex

        new_region = {

            'id': region_id,

            'description': uuid.uuid4().hex,

        }

        res = self.catalog_api.create_region(

            new_region.copy())

        # Ensure that we don't need to have a

        # parent_region_id in the original supplied

        # ref dict, but that it will be returned from

        # the endpoint, with None value.

        expected_region = new_region.copy()

        expected_region['parent_region_id'] = None

        self.assertDictEqual(res, expected_region)

        # Test adding another region with the one above

        # as its parent. We will check below whether deleting

        # the parent successfully deletes any child regions.

        parent_region_id = region_id

        region_id = uuid.uuid4().hex

        new_region = {

            'id': region_id,

            'description': uuid.uuid4().hex,

            'parent_region_id': parent_region_id

        }

        self.catalog_api.create_region(

            new_region.copy())

        # list

        regions = self.catalog_api.list_regions()

        self.assertThat(regions, matchers.HasLength(2))

        region_ids = [x['id'] for x in regions]

        self.assertIn(parent_region_id, region_ids)

        self.assertIn(region_id, region_ids)

        # delete

        self.catalog_api.delete_region(parent_region_id)

        self.assertRaises(exception.RegionNotFound,

                          self.catalog_api.delete_region,

                          parent_region_id)

        self.assertRaises(exception.RegionNotFound,

                          self.catalog_api.get_region,

                          parent_region_id)

        # Ensure the child is also gone...

        self.assertRaises(exception.RegionNotFound,

                          self.catalog_api.get_region,

                          region_id)

**** CubicPower OpenStack Study ****

    def test_create_region_with_duplicate_id(self):

        region_id = uuid.uuid4().hex

        new_region = {

            'id': region_id,

            'description': uuid.uuid4().hex

        }

        self.catalog_api.create_region(new_region)

        # Create region again with duplicate id

        self.assertRaises(exception.Conflict,

                          self.catalog_api.create_region,

                          new_region)

**** CubicPower OpenStack Study ****

    def test_get_region_404(self):

        self.assertRaises(exception.RegionNotFound,

                          self.catalog_api.get_region,

                          uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_delete_region_404(self):

        self.assertRaises(exception.RegionNotFound,

                          self.catalog_api.delete_region,

                          uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_create_region_invalid_parent_region_404(self):

        region_id = uuid.uuid4().hex

        new_region = {

            'id': region_id,

            'description': uuid.uuid4().hex,

            'parent_region_id': 'nonexisting'

        }

        self.assertRaises(exception.RegionNotFound,

                          self.catalog_api.create_region,

                          new_region)

**** CubicPower OpenStack Study ****

    def test_service_crud(self):

        # create

        service_id = uuid.uuid4().hex

        new_service = {

            'id': service_id,

            'type': uuid.uuid4().hex,

            'name': uuid.uuid4().hex,

            'description': uuid.uuid4().hex,

        }

        res = self.catalog_api.create_service(

            service_id,

            new_service.copy())

        new_service['enabled'] = True

        self.assertDictEqual(new_service, res)

        # list

        services = self.catalog_api.list_services()

        self.assertIn(service_id, [x['id'] for x in services])

        # delete

        self.catalog_api.delete_service(service_id)

        self.assertRaises(exception.ServiceNotFound,

                          self.catalog_api.delete_service,

                          service_id)

        self.assertRaises(exception.ServiceNotFound,

                          self.catalog_api.get_service,

                          service_id)

**** CubicPower OpenStack Study ****

    def test_delete_service_with_endpoint(self):

        # create a service

        service = {

            'id': uuid.uuid4().hex,

            'type': uuid.uuid4().hex,

            'name': uuid.uuid4().hex,

            'description': uuid.uuid4().hex,

        }

        self.catalog_api.create_service(service['id'], service)

        # create an endpoint attached to the service

        endpoint = {

            'id': uuid.uuid4().hex,

            'region': uuid.uuid4().hex,

            'interface': uuid.uuid4().hex[:8],

            'url': uuid.uuid4().hex,

            'service_id': service['id'],

        }

        self.catalog_api.create_endpoint(endpoint['id'], endpoint)

        # deleting the service should also delete the endpoint

        self.catalog_api.delete_service(service['id'])

        self.assertRaises(exception.EndpointNotFound,

                          self.catalog_api.get_endpoint,

                          endpoint['id'])

        self.assertRaises(exception.EndpointNotFound,

                          self.catalog_api.delete_endpoint,

                          endpoint['id'])

**** CubicPower OpenStack Study ****

    def test_get_service_404(self):

        self.assertRaises(exception.ServiceNotFound,

                          self.catalog_api.get_service,

                          uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_delete_service_404(self):

        self.assertRaises(exception.ServiceNotFound,

                          self.catalog_api.delete_service,

                          uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_create_endpoint_404(self):

        endpoint = {

            'id': uuid.uuid4().hex,

            'service_id': uuid.uuid4().hex,

        }

        self.assertRaises(exception.ServiceNotFound,

                          self.catalog_api.create_endpoint,

                          endpoint['id'],

                          endpoint)

**** CubicPower OpenStack Study ****

    def test_get_endpoint_404(self):

        self.assertRaises(exception.EndpointNotFound,

                          self.catalog_api.get_endpoint,

                          uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_delete_endpoint_404(self):

        self.assertRaises(exception.EndpointNotFound,

                          self.catalog_api.delete_endpoint,

                          uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_create_endpoint(self):

        service = {

            'id': uuid.uuid4().hex,

            'type': uuid.uuid4().hex,

            'name': uuid.uuid4().hex,

            'description': uuid.uuid4().hex,

        }

        self.catalog_api.create_service(service['id'], service.copy())

        endpoint = {

            'id': uuid.uuid4().hex,

            'region': "0" * 255,

            'service_id': service['id'],

            'interface': 'public',

            'url': uuid.uuid4().hex,

        }

        self.catalog_api.create_endpoint(endpoint['id'], endpoint.copy())

**** CubicPower OpenStack Study ****

    def _create_endpoints(self):

        # Creates a service and 2 endpoints for the service in the same region.

        # The 'public' interface is enabled and the 'internal' interface is

        # disabled.

        def create_endpoint(service_id, region, **kwargs):

            id_ = uuid.uuid4().hex

            ref = {

                'id': id_,

                'interface': 'public',

                'region': region,

                'service_id': service_id,

                'url': 'http://localhost/%s' % uuid.uuid4().hex,

            }

            ref.update(kwargs)

            self.catalog_api.create_endpoint(id_, ref)

            return ref

        # Create a service for use with the endpoints.

        service_id = uuid.uuid4().hex

        service_ref = {

            'id': service_id,

            'name': uuid.uuid4().hex,

            'type': uuid.uuid4().hex,

        }

        self.catalog_api.create_service(service_id, service_ref)

        region = uuid.uuid4().hex

        # Create endpoints

        enabled_endpoint_ref = create_endpoint(service_id, region)

        disabled_endpoint_ref = create_endpoint(

            service_id, region, enabled=False, interface='internal')

        return service_ref, enabled_endpoint_ref, disabled_endpoint_ref

**** CubicPower OpenStack Study ****

        def create_endpoint(service_id, region, **kwargs):

            id_ = uuid.uuid4().hex

            ref = {

                'id': id_,

                'interface': 'public',

                'region': region,

                'service_id': service_id,

                'url': 'http://localhost/%s' % uuid.uuid4().hex,

            }

            ref.update(kwargs)

            self.catalog_api.create_endpoint(id_, ref)

            return ref

        # Create a service for use with the endpoints.

        service_id = uuid.uuid4().hex

        service_ref = {

            'id': service_id,

            'name': uuid.uuid4().hex,

            'type': uuid.uuid4().hex,

        }

        self.catalog_api.create_service(service_id, service_ref)

        region = uuid.uuid4().hex

        # Create endpoints

        enabled_endpoint_ref = create_endpoint(service_id, region)

        disabled_endpoint_ref = create_endpoint(

            service_id, region, enabled=False, interface='internal')

        return service_ref, enabled_endpoint_ref, disabled_endpoint_ref

**** CubicPower OpenStack Study ****

    def test_get_catalog_endpoint_disabled(self):

        """Get back only enabled endpoints when get the v2 catalog."""

        service_ref, enabled_endpoint_ref, dummy_disabled_endpoint_ref = (

            self._create_endpoints())

        user_id = uuid.uuid4().hex

        project_id = uuid.uuid4().hex

        catalog = self.catalog_api.get_catalog(user_id, project_id)

        exp_entry = {

            'id': enabled_endpoint_ref['id'],

            'name': service_ref['name'],

            'publicURL': enabled_endpoint_ref['url'],

        }

        region = enabled_endpoint_ref['region']

        self.assertEqual(exp_entry, catalog[region][service_ref['type']])

**** CubicPower OpenStack Study ****

    def test_get_v3_catalog_endpoint_disabled(self):

        """Get back only enabled endpoints when get the v3 catalog."""

        enabled_endpoint_ref = self._create_endpoints()[1]

        user_id = uuid.uuid4().hex

        project_id = uuid.uuid4().hex

        catalog = self.catalog_api.get_v3_catalog(user_id, project_id)

        endpoint_ids = [x['id'] for x in catalog[0]['endpoints']]

        self.assertEqual([enabled_endpoint_ref['id']], endpoint_ids)

**** CubicPower OpenStack Study ****

class PolicyTests(object):

**** CubicPower OpenStack Study ****

    def _new_policy_ref(self):

        return {

            'id': uuid.uuid4().hex,

            'policy': uuid.uuid4().hex,

            'type': uuid.uuid4().hex,

            'endpoint_id': uuid.uuid4().hex,

        }

**** CubicPower OpenStack Study ****

    def assertEqualPolicies(self, a, b):

        self.assertEqual(a['id'], b['id'])

        self.assertEqual(a['endpoint_id'], b['endpoint_id'])

        self.assertEqual(a['policy'], b['policy'])

        self.assertEqual(a['type'], b['type'])

**** CubicPower OpenStack Study ****

    def test_create(self):

        ref = self._new_policy_ref()

        res = self.policy_api.create_policy(ref['id'], ref)

        self.assertEqualPolicies(ref, res)

**** CubicPower OpenStack Study ****

    def test_get(self):

        ref = self._new_policy_ref()

        res = self.policy_api.create_policy(ref['id'], ref)

        res = self.policy_api.get_policy(ref['id'])

        self.assertEqualPolicies(ref, res)

**** CubicPower OpenStack Study ****

    def test_list(self):

        ref = self._new_policy_ref()

        self.policy_api.create_policy(ref['id'], ref)

        res = self.policy_api.list_policies()

        res = [x for x in res if x['id'] == ref['id']][0]

        self.assertEqualPolicies(ref, res)

**** CubicPower OpenStack Study ****

    def test_update(self):

        ref = self._new_policy_ref()

        self.policy_api.create_policy(ref['id'], ref)

        orig = ref

        ref = self._new_policy_ref()

        # (cannot change policy ID)

        self.assertRaises(exception.ValidationError,

                          self.policy_api.update_policy,

                          orig['id'],

                          ref)

        ref['id'] = orig['id']

        res = self.policy_api.update_policy(orig['id'], ref)

        self.assertEqualPolicies(ref, res)

**** CubicPower OpenStack Study ****

    def test_delete(self):

        ref = self._new_policy_ref()

        self.policy_api.create_policy(ref['id'], ref)

        self.policy_api.delete_policy(ref['id'])

        self.assertRaises(exception.PolicyNotFound,

                          self.policy_api.delete_policy,

                          ref['id'])

        self.assertRaises(exception.PolicyNotFound,

                          self.policy_api.get_policy,

                          ref['id'])

        res = self.policy_api.list_policies()

        self.assertFalse(len([x for x in res if x['id'] == ref['id']]))

**** CubicPower OpenStack Study ****

    def test_get_policy_404(self):

        self.assertRaises(exception.PolicyNotFound,

                          self.policy_api.get_policy,

                          uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_update_policy_404(self):

        ref = self._new_policy_ref()

        self.assertRaises(exception.PolicyNotFound,

                          self.policy_api.update_policy,

                          ref['id'],

                          ref)

**** CubicPower OpenStack Study ****

    def test_delete_policy_404(self):

        self.assertRaises(exception.PolicyNotFound,

                          self.policy_api.delete_policy,

                          uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

class InheritanceTests(object):

**** CubicPower OpenStack Study ****

    def test_inherited_role_grants_for_user(self):

        """Test inherited user roles.

        Test Plan:

        - Enable OS-INHERIT extension

        - Create 3 roles

        - Create a domain, with a project and a user

        - Check no roles yet exit

        - Assign a direct user role to the project and a (non-inherited)

          user role to the domain

        - Get a list of effective roles - should only get the one direct role

        - Now add an inherited user role to the domain

        - Get a list of effective roles - should have two roles, one

          direct and one by virtue of the inherited user role

        - Also get effective roles for the domain - the role marked as

          inherited should not show up

        """

        self.config_fixture.config(group='os_inherit', enabled=True)

        role_list = []

        for _ in range(3):

            role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

            self.assignment_api.create_role(role['id'], role)

            role_list.append(role)

        domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain1['id'], domain1)

        user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                 'domain_id': domain1['id'], 'password': uuid.uuid4().hex,

                 'enabled': True}

        self.identity_api.create_user(user1['id'], user1)

        project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                    'domain_id': domain1['id']}

        self.assignment_api.create_project(project1['id'], project1)

        roles_ref = self.assignment_api.list_grants(

            user_id=user1['id'],

            project_id=project1['id'])

        self.assertEqual(len(roles_ref), 0)

        # Create the first two roles - the domain one is not inherited

        self.assignment_api.create_grant(user_id=user1['id'],

                                         project_id=project1['id'],

                                         role_id=role_list[0]['id'])

        self.assignment_api.create_grant(user_id=user1['id'],

                                         domain_id=domain1['id'],

                                         role_id=role_list[1]['id'])

        # Now get the effective roles for the user and project, this

        # should only include the direct role assignment on the project

        combined_list = self.assignment_api.get_roles_for_user_and_project(

            user1['id'], project1['id'])

        self.assertEqual(len(combined_list), 1)

        self.assertIn(role_list[0]['id'], combined_list)

        # Now add an inherited role on the domain

        self.assignment_api.create_grant(user_id=user1['id'],

                                         domain_id=domain1['id'],

                                         role_id=role_list[2]['id'],

                                         inherited_to_projects=True)

        # Now get the effective roles for the user and project again, this

        # should now include the inherited role on the domain

        combined_list = self.assignment_api.get_roles_for_user_and_project(

            user1['id'], project1['id'])

        self.assertEqual(len(combined_list), 2)

        self.assertIn(role_list[0]['id'], combined_list)

        self.assertIn(role_list[2]['id'], combined_list)

        # Finally, check that the inherited role does not appear as a valid

        # directly assigned role on the domain itself

        combined_role_list = self.assignment_api.get_roles_for_user_and_domain(

            user1['id'], domain1['id'])

        self.assertEqual(len(combined_role_list), 1)

        self.assertIn(role_list[1]['id'], combined_role_list)

**** CubicPower OpenStack Study ****

    def test_inherited_role_grants_for_group(self):

        """Test inherited group roles.

        Test Plan:

        - Enable OS-INHERIT extension

        - Create 4 roles

        - Create a domain, with a project, user and two groups

        - Make the user a member of both groups

        - Check no roles yet exit

        - Assign a direct user role to the project and a (non-inherited)

          group role on the domain

        - Get a list of effective roles - should only get the one direct role

        - Now add two inherited group roles to the domain

        - Get a list of effective roles - should have three roles, one

          direct and two by virtue of inherited group roles

        """

        self.config_fixture.config(group='os_inherit', enabled=True)

        role_list = []

        for _ in range(4):

            role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

            self.assignment_api.create_role(role['id'], role)

            role_list.append(role)

        domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain1['id'], domain1)

        user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                 'domain_id': domain1['id'], 'password': uuid.uuid4().hex,

                 'enabled': True}

        self.identity_api.create_user(user1['id'], user1)

        group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                  'domain_id': domain1['id'], 'enabled': True}

        self.identity_api.create_group(group1['id'], group1)

        group2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                  'domain_id': domain1['id'], 'enabled': True}

        self.identity_api.create_group(group2['id'], group2)

        project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                    'domain_id': domain1['id']}

        self.assignment_api.create_project(project1['id'], project1)

        self.identity_api.add_user_to_group(user1['id'],

                                            group1['id'])

        self.identity_api.add_user_to_group(user1['id'],

                                            group2['id'])

        roles_ref = self.assignment_api.list_grants(

            user_id=user1['id'],

            project_id=project1['id'])

        self.assertEqual(len(roles_ref), 0)

        # Create two roles - the domain one is not inherited

        self.assignment_api.create_grant(user_id=user1['id'],

                                         project_id=project1['id'],

                                         role_id=role_list[0]['id'])

        self.assignment_api.create_grant(group_id=group1['id'],

                                         domain_id=domain1['id'],

                                         role_id=role_list[1]['id'])

        # Now get the effective roles for the user and project, this

        # should only include the direct role assignment on the project

        combined_list = self.assignment_api.get_roles_for_user_and_project(

            user1['id'], project1['id'])

        self.assertEqual(len(combined_list), 1)

        self.assertIn(role_list[0]['id'], combined_list)

        # Now add to more group roles, both inherited, to the domain

        self.assignment_api.create_grant(group_id=group2['id'],

                                         domain_id=domain1['id'],

                                         role_id=role_list[2]['id'],

                                         inherited_to_projects=True)

        self.assignment_api.create_grant(group_id=group2['id'],

                                         domain_id=domain1['id'],

                                         role_id=role_list[3]['id'],

                                         inherited_to_projects=True)

        # Now get the effective roles for the user and project again, this

        # should now include the inherited roles on the domain

        combined_list = self.assignment_api.get_roles_for_user_and_project(

            user1['id'], project1['id'])

        self.assertEqual(len(combined_list), 3)

        self.assertIn(role_list[0]['id'], combined_list)

        self.assertIn(role_list[2]['id'], combined_list)

        self.assertIn(role_list[3]['id'], combined_list)

**** CubicPower OpenStack Study ****

    def test_list_projects_for_user_with_inherited_grants(self):

        """Test inherited group roles.

        Test Plan:

        - Enable OS-INHERIT extension

        - Create a domain, with two projects and a user

        - Assign an inherited user role on the domain, as well as a direct

          user role to a separate project in a different domain

        - Get a list of projects for user, should return all three projects

        """

        self.config_fixture.config(group='os_inherit', enabled=True)

        domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain['id'], domain)

        user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                 'password': uuid.uuid4().hex, 'domain_id': domain['id'],

                 'enabled': True}

        self.identity_api.create_user(user1['id'], user1)

        project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                    'domain_id': domain['id']}

        self.assignment_api.create_project(project1['id'], project1)

        project2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                    'domain_id': domain['id']}

        self.assignment_api.create_project(project2['id'], project2)

        # Create 2 grants, one on a project and one inherited grant

        # on the domain

        self.assignment_api.create_grant(user_id=user1['id'],

                                         project_id=self.tenant_bar['id'],

                                         role_id=self.role_member['id'])

        self.assignment_api.create_grant(user_id=user1['id'],

                                         domain_id=domain['id'],

                                         role_id=self.role_admin['id'],

                                         inherited_to_projects=True)

        # Should get back all three projects, one by virtue of the direct

        # grant, plus both projects in the domain

        user_projects = self.assignment_api.list_projects_for_user(user1['id'])

        self.assertEqual(len(user_projects), 3)

**** CubicPower OpenStack Study ****

    def test_list_projects_for_user_with_inherited_group_grants(self):

        """Test inherited group roles.

        Test Plan:

        - Enable OS-INHERIT extension

        - Create two domains, each with two projects

        - Create a user and group

        - Make the user a member of the group

        - Assign a user role two projects, an inherited

          group role to one domain and an inherited regular role on

          the other domain

        - Get a list of projects for user, should return both pairs of projects

          from the domain, plus the one separate project

        """

        self.config_fixture.config(group='os_inherit', enabled=True)

        domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain['id'], domain)

        domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain2['id'], domain2)

        project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                    'domain_id': domain['id']}

        self.assignment_api.create_project(project1['id'], project1)

        project2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                    'domain_id': domain['id']}

        self.assignment_api.create_project(project2['id'], project2)

        project3 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                    'domain_id': domain2['id']}

        self.assignment_api.create_project(project3['id'], project3)

        project4 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                    'domain_id': domain2['id']}

        self.assignment_api.create_project(project4['id'], project4)

        user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                 'password': uuid.uuid4().hex, 'domain_id': domain['id'],

                 'enabled': True}

        self.identity_api.create_user(user1['id'], user1)

        group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                  'domain_id': domain['id']}

        self.identity_api.create_group(group1['id'], group1)

        self.identity_api.add_user_to_group(user1['id'], group1['id'])

        # Create 4 grants:

        # - one user grant on a project in domain2

        # - one user grant on a project in the default domain

        # - one inherited user grant on domain

        # - one inherited group grant on domain2

        self.assignment_api.create_grant(user_id=user1['id'],

                                         project_id=project3['id'],

                                         role_id=self.role_member['id'])

        self.assignment_api.create_grant(user_id=user1['id'],

                                         project_id=self.tenant_bar['id'],

                                         role_id=self.role_member['id'])

        self.assignment_api.create_grant(user_id=user1['id'],

                                         domain_id=domain['id'],

                                         role_id=self.role_admin['id'],

                                         inherited_to_projects=True)

        self.assignment_api.create_grant(group_id=group1['id'],

                                         domain_id=domain2['id'],

                                         role_id=self.role_admin['id'],

                                         inherited_to_projects=True)

        # Should get back all five projects, but without a duplicate for

        # project3 (since it has both a direct user role and an inherited role)

        user_projects = self.assignment_api.list_projects_for_user(user1['id'])

        self.assertEqual(len(user_projects), 5)

**** CubicPower OpenStack Study ****

class FilterTests(filtering.FilterTests):

**** CubicPower OpenStack Study ****

    def test_list_users_filtered(self):

        domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(domain1['id'], domain1)

        for entity in ['user', 'group', 'project']:

            # Create 20 entities, 14 of which are in domain1

            entity_list = self._create_test_data(entity, 6)

            domain1_entity_list = self._create_test_data(entity, 14,

                                                         domain1['id'])

            # Should get back the 14 entities in domain1

            hints = driver_hints.Hints()

            hints.add_filter('domain_id', domain1['id'])

            entities = self._list_entities(entity)(hints=hints)

            self.assertEqual(len(entities), 14)

            self._match_with_list(entities, domain1_entity_list)

            # Check the driver has removed the filter from the list hints

            self.assertFalse(hints.get_exact_filter_by_name('domain_id'))

            # Try filtering to get one an exact item out of the list

            hints = driver_hints.Hints()

            hints.add_filter('name', domain1_entity_list[10]['name'])

            entities = self._list_entities(entity)(hints=hints)

            self.assertEqual(len(entities), 1)

            self.assertEqual(entities[0]['id'], domain1_entity_list[10]['id'])

            # Check the driver has removed the filter from the list hints

            self.assertFalse(hints.get_exact_filter_by_name('name'))

            self._delete_test_data(entity, entity_list)

            self._delete_test_data(entity, domain1_entity_list)

**** CubicPower OpenStack Study ****

    def test_list_users_inexact_filtered(self):

        # Create 20 users

        user_list = self._create_test_data('user', 20)

        # Set up some names that we can filter on

        user = user_list[5]

        user['name'] = 'The'

        self.identity_api.update_user(user['id'], user)

        user = user_list[6]

        user['name'] = 'The Ministry'

        self.identity_api.update_user(user['id'], user)

        user = user_list[7]

        user['name'] = 'The Ministry of'

        self.identity_api.update_user(user['id'], user)

        user = user_list[8]

        user['name'] = 'The Ministry of Silly'

        self.identity_api.update_user(user['id'], user)

        user = user_list[9]

        user['name'] = 'The Ministry of Silly Walks'

        self.identity_api.update_user(user['id'], user)

        # ...and one for useful case insensitivity testing

        user = user_list[10]

        user['name'] = 'The ministry of silly walks OF'

        self.identity_api.update_user(user['id'], user)

        hints = driver_hints.Hints()

        hints.add_filter('name', 'ministry', comparator='contains')

        users = self.identity_api.list_users(hints=hints)

        self.assertEqual(len(users), 5)

        self._match_with_list(users, user_list,

                              list_start=6, list_end=11)

        #TODO(henry-nash) Check inexact filter has been removed.

        hints = driver_hints.Hints()

        hints.add_filter('name', 'The', comparator='startswith')

        users = self.identity_api.list_users(hints=hints)

        self.assertEqual(len(users), 6)

        self._match_with_list(users, user_list,

                              list_start=5, list_end=11)

        #TODO(henry-nash) Check inexact filter has been removed.

        hints = driver_hints.Hints()

        hints.add_filter('name', 'of', comparator='endswith')

        users = self.identity_api.list_users(hints=hints)

        self.assertEqual(len(users), 2)

        self.assertEqual(users[0]['id'], user_list[7]['id'])

        self.assertEqual(users[1]['id'], user_list[10]['id'])

        #TODO(henry-nash) Check inexact filter has been removed.

        # TODO(henry-nash): Add some case sensitive tests.  The issue

        # is that MySQL 0.7, by default, is installed in case

        # insensitive mode (which is what is run by default for our

        # SQL backend tests).  For production deployments. OpenStack

        # assumes a case sensitive database.  For these tests, therefore, we

        # need to be able to check the sensitivity of the database so as to

        # know whether to run case sensitive tests here.

        self._delete_test_data('user', user_list)

**** CubicPower OpenStack Study ****

    def test_filter_sql_injection_attack(self):

        """Test against sql injection attack on filters

        Test Plan:

        - Attempt to get all entities back by passing a two-term attribute

        - Attempt to piggyback filter to damage DB (e.g. drop table)

        """

        # Check we have some users

        users = self.identity_api.list_users()

        self.assertTrue(len(users) > 0)

        hints = driver_hints.Hints()

        hints.add_filter('name', "anything' or 'x'='x")

        users = self.identity_api.list_users(hints=hints)

        self.assertEqual(len(users), 0)

        # See if we can add a SQL command...use the group table instead of the

        # user table since 'user' is reserved word for SQLAlchemy.

        group = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,

                 'domain_id': DEFAULT_DOMAIN_ID}

        self.identity_api.create_group(group['id'], group)

        hints = driver_hints.Hints()

        hints.add_filter('name', "x'; drop table group")

        groups = self.identity_api.list_groups(hints=hints)

        self.assertEqual(len(groups), 0)

        groups = self.identity_api.list_groups()

        self.assertTrue(len(groups) > 0)

**** CubicPower OpenStack Study ****

class LimitTests(filtering.FilterTests):

ENTITIES = ['user', 'group', 'project']

**** CubicPower OpenStack Study ****

    def setUp(self):

        """Setup for Limit Test Cases."""

        self.domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}

        self.assignment_api.create_domain(self.domain1['id'], self.domain1)

        self.addCleanup(self.clean_up_domain)

        self.entity_lists = {}

        self.domain1_entity_lists = {}

        for entity in self.ENTITIES:

            # Create 20 entities, 14 of which are in domain1

            self.entity_lists[entity] = self._create_test_data(entity, 6)

            self.domain1_entity_lists[entity] = self._create_test_data(

                entity, 14, self.domain1['id'])

        self.addCleanup(self.clean_up_entities)

**** CubicPower OpenStack Study ****

    def clean_up_domain(self):

        """Clean up domain test data from Limit Test Cases."""

        self.domain1['enabled'] = False

        self.assignment_api.update_domain(self.domain1['id'], self.domain1)

        self.assignment_api.delete_domain(self.domain1['id'])

        del self.domain1

**** CubicPower OpenStack Study ****

    def clean_up_entities(self):

        """Clean up entity test data from Limit Test Cases."""

        for entity in self.ENTITIES:

            self._delete_test_data(entity, self.entity_lists[entity])

            self._delete_test_data(entity, self.domain1_entity_lists[entity])

        del self.entity_lists

        del self.domain1_entity_lists

**** CubicPower OpenStack Study ****

    def _test_list_entity_filtered_and_limited(self, entity):

        self.config_fixture.config(list_limit=10)

        # Should get back just 10 entities in domain1

        hints = driver_hints.Hints()

        hints.add_filter('domain_id', self.domain1['id'])

        entities = self._list_entities(entity)(hints=hints)

        self.assertEqual(len(entities), hints.get_limit()['limit'])

        self.assertTrue(hints.get_limit()['truncated'])

        self._match_with_list(entities, self.domain1_entity_lists[entity])

        # Override with driver specific limit

        if entity == 'project':

            self.config_fixture.config(group='assignment', list_limit=5)

        else:

            self.config_fixture.config(group='identity', list_limit=5)

        # Should get back just 5 users in domain1

        hints = driver_hints.Hints()

        hints.add_filter('domain_id', self.domain1['id'])

        entities = self._list_entities(entity)(hints=hints)

        self.assertEqual(len(entities), hints.get_limit()['limit'])

        self._match_with_list(entities, self.domain1_entity_lists[entity])

        # Finally, let's pretend we want to get the full list of entities,

        # even with the limits set, as part of some internal calculation.

        # Calling the API without a hints list should achieve this, and

        # return at least the 20 entries we created (there may be other

        # entities lying around created by other tests/setup).

        entities = self._list_entities(entity)()

        self.assertTrue(len(entities) >= 20)

**** CubicPower OpenStack Study ****

    def test_list_users_filtered_and_limited(self):

        self._test_list_entity_filtered_and_limited('user')

**** CubicPower OpenStack Study ****

    def test_list_groups_filtered_and_limited(self):

        self._test_list_entity_filtered_and_limited('group')

**** CubicPower OpenStack Study ****

    def test_list_projects_filtered_and_limited(self):

        self._test_list_entity_filtered_and_limited('project')