¡@

Home 

OpenStack Study: test_keystoneclient.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2012 OpenStack Foundation

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

import datetime

import os

import uuid

import webob

import mock

from keystone import config

from keystone.openstack.common import jsonutils

from keystone.openstack.common import timeutils

from keystone import tests

from keystone.tests import default_fixtures

from keystone.tests.ksfixtures import appserver

CONF = config.CONF

DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id

OPENSTACK_REPO = 'https://review.openstack.org/p/openstack'

KEYSTONECLIENT_REPO = '%s/python-keystoneclient.git' % OPENSTACK_REPO

**** CubicPower OpenStack Study ****

class CompatTestCase(tests.NoModule, tests.TestCase):

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(CompatTestCase, self).setUp()

        # FIXME(morganfainberg): Since we are running tests through the

        # controllers and some internal api drivers are SQL-only, the correct

        # approach is to ensure we have the correct backing store. The

        # credential api makes some very SQL specific assumptions that should

        # be addressed allowing for non-SQL based testing to occur.

        self.load_backends()

        self.load_fixtures(default_fixtures)

        # TODO(termie): add an admin user to the fixtures and use that user

        # override the fixtures, for now

        self.assignment_api.add_role_to_user_and_project(

            self.user_foo['id'],

            self.tenant_bar['id'],

            self.role_admin['id'])

        conf = self._paste_config('keystone')

        fixture = self.useFixture(appserver.AppServer(conf, appserver.MAIN))

        self.public_server = fixture.server

        fixture = self.useFixture(appserver.AppServer(conf, appserver.ADMIN))

        self.admin_server = fixture.server

        self.addCleanup(self.cleanup_instance('public_server', 'admin_server'))

        if isinstance(self.checkout_info, str):

            revdir = self.checkout_info

        else:

            revdir = tests.checkout_vendor(*self.checkout_info)

        self.add_path(revdir)

        self.clear_module('keystoneclient')

**** CubicPower OpenStack Study ****

    def _public_url(self):

        public_port = self.public_server.socket_info['socket'][1]

        return "http://localhost:%s/v2.0" % public_port

**** CubicPower OpenStack Study ****

    def _admin_url(self):

        admin_port = self.admin_server.socket_info['socket'][1]

        return "http://localhost:%s/v2.0" % admin_port

**** CubicPower OpenStack Study ****

    def _client(self, admin=False, **kwargs):

        from keystoneclient.v2_0 import client as ks_client

        url = self._admin_url() if admin else self._public_url()

        kc = ks_client.Client(endpoint=url,

                              auth_url=self._public_url(),

                              **kwargs)

        kc.authenticate()

        # have to manually overwrite the management url after authentication

        kc.management_url = url

        return kc

**** CubicPower OpenStack Study ****

    def get_client(self, user_ref=None, tenant_ref=None, admin=False):

        if user_ref is None:

            user_ref = self.user_foo

        if tenant_ref is None:

            for user in default_fixtures.USERS:

                if user['id'] == user_ref['id']:

                    tenant_id = user['tenants'][0]

        else:

            tenant_id = tenant_ref['id']

        return self._client(username=user_ref['name'],

                            password=user_ref['password'],

                            tenant_id=tenant_id,

                            admin=admin)

**** CubicPower OpenStack Study ****

class KeystoneClientTests(object):

"""Tests for all versions of keystoneclient."""

**** CubicPower OpenStack Study ****

    def test_authenticate_tenant_name_and_tenants(self):

        client = self.get_client()

        tenants = client.tenants.list()

        self.assertEqual(tenants[0].id, self.tenant_bar['id'])

**** CubicPower OpenStack Study ****

    def test_authenticate_tenant_id_and_tenants(self):

        client = self._client(username=self.user_foo['name'],

                              password=self.user_foo['password'],

                              tenant_id='bar')

        tenants = client.tenants.list()

        self.assertEqual(tenants[0].id, self.tenant_bar['id'])

**** CubicPower OpenStack Study ****

    def test_authenticate_invalid_tenant_id(self):

        from keystoneclient import exceptions as client_exceptions

        self.assertRaises(client_exceptions.Unauthorized,

                          self._client,

                          username=self.user_foo['name'],

                          password=self.user_foo['password'],

                          tenant_id='baz')

**** CubicPower OpenStack Study ****

    def test_authenticate_token_no_tenant(self):

        client = self.get_client()

        token = client.auth_token

        token_client = self._client(token=token)

        tenants = token_client.tenants.list()

        self.assertEqual(tenants[0].id, self.tenant_bar['id'])

**** CubicPower OpenStack Study ****

    def test_authenticate_token_tenant_id(self):

        client = self.get_client()

        token = client.auth_token

        token_client = self._client(token=token, tenant_id='bar')

        tenants = token_client.tenants.list()

        self.assertEqual(tenants[0].id, self.tenant_bar['id'])

**** CubicPower OpenStack Study ****

    def test_authenticate_token_invalid_tenant_id(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client()

        token = client.auth_token

        self.assertRaises(client_exceptions.Unauthorized,

                          self._client, token=token,

                          tenant_id=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_authenticate_token_invalid_tenant_name(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client()

        token = client.auth_token

        self.assertRaises(client_exceptions.Unauthorized,

                          self._client, token=token,

                          tenant_name=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_authenticate_token_tenant_name(self):

        client = self.get_client()

        token = client.auth_token

        token_client = self._client(token=token, tenant_name='BAR')

        tenants = token_client.tenants.list()

        self.assertEqual(tenants[0].id, self.tenant_bar['id'])

        self.assertEqual(tenants[0].id, self.tenant_bar['id'])

**** CubicPower OpenStack Study ****

    def test_authenticate_and_delete_token(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        token = client.auth_token

        token_client = self._client(token=token)

        tenants = token_client.tenants.list()

        self.assertEqual(tenants[0].id, self.tenant_bar['id'])

        client.tokens.delete(token_client.auth_token)

        self.assertRaises(client_exceptions.Unauthorized,

                          token_client.tenants.list)

**** CubicPower OpenStack Study ****

    def test_authenticate_no_password(self):

        from keystoneclient import exceptions as client_exceptions

        user_ref = self.user_foo.copy()

        user_ref['password'] = None

        self.assertRaises(client_exceptions.AuthorizationFailure,

                          self.get_client,

                          user_ref)

**** CubicPower OpenStack Study ****

    def test_authenticate_no_username(self):

        from keystoneclient import exceptions as client_exceptions

        user_ref = self.user_foo.copy()

        user_ref['name'] = None

        self.assertRaises(client_exceptions.AuthorizationFailure,

                          self.get_client,

                          user_ref)

**** CubicPower OpenStack Study ****

    def test_authenticate_disabled_tenant(self):

        from keystoneclient import exceptions as client_exceptions

        admin_client = self.get_client(admin=True)

        tenant = {

            'name': uuid.uuid4().hex,

            'description': uuid.uuid4().hex,

            'enabled': False,

        }

        tenant_ref = admin_client.tenants.create(

            tenant_name=tenant['name'],

            description=tenant['description'],

            enabled=tenant['enabled'])

        tenant['id'] = tenant_ref.id

        user = {

            'name': uuid.uuid4().hex,

            'password': uuid.uuid4().hex,

            'email': uuid.uuid4().hex,

            'tenant_id': tenant['id'],

        }

        user_ref = admin_client.users.create(

            name=user['name'],

            password=user['password'],

            email=user['email'],

            tenant_id=user['tenant_id'])

        user['id'] = user_ref.id

        # password authentication

        self.assertRaises(

            client_exceptions.Unauthorized,

            self._client,

            username=user['name'],

            password=user['password'],

            tenant_id=tenant['id'])

        # token authentication

        client = self._client(

            username=user['name'],

            password=user['password'])

        self.assertRaises(

            client_exceptions.Unauthorized,

            self._client,

            token=client.auth_token,

            tenant_id=tenant['id'])

    # FIXME(ja): this test should require the "keystone:admin" roled

    #            (probably the role set via --keystone_admin_role flag)

    # FIXME(ja): add a test that admin endpoint is only sent to admin user

    # FIXME(ja): add a test that admin endpoint returns unauthorized if not

    #            admin

**** CubicPower OpenStack Study ****

    def test_tenant_create_update_and_delete(self):

        from keystoneclient import exceptions as client_exceptions

        tenant_name = 'original_tenant'

        tenant_description = 'My original tenant!'

        tenant_enabled = True

        client = self.get_client(admin=True)

        # create, get, and list a tenant

        tenant = client.tenants.create(tenant_name=tenant_name,

                                       description=tenant_description,

                                       enabled=tenant_enabled)

        self.assertEqual(tenant.name, tenant_name)

        self.assertEqual(tenant.description, tenant_description)

        self.assertEqual(tenant.enabled, tenant_enabled)

        tenant = client.tenants.get(tenant_id=tenant.id)

        self.assertEqual(tenant.name, tenant_name)

        self.assertEqual(tenant.description, tenant_description)

        self.assertEqual(tenant.enabled, tenant_enabled)

        tenant = [t for t in client.tenants.list() if t.id == tenant.id].pop()

        self.assertEqual(tenant.name, tenant_name)

        self.assertEqual(tenant.description, tenant_description)

        self.assertEqual(tenant.enabled, tenant_enabled)

        # update, get, and list a tenant

        tenant_name = 'updated_tenant'

        tenant_description = 'Updated tenant!'

        tenant_enabled = False

        tenant = client.tenants.update(tenant_id=tenant.id,

                                       tenant_name=tenant_name,

                                       enabled=tenant_enabled,

                                       description=tenant_description)

        self.assertEqual(tenant.name, tenant_name)

        self.assertEqual(tenant.description, tenant_description)

        self.assertEqual(tenant.enabled, tenant_enabled)

        tenant = client.tenants.get(tenant_id=tenant.id)

        self.assertEqual(tenant.name, tenant_name)

        self.assertEqual(tenant.description, tenant_description)

        self.assertEqual(tenant.enabled, tenant_enabled)

        tenant = [t for t in client.tenants.list() if t.id == tenant.id].pop()

        self.assertEqual(tenant.name, tenant_name)

        self.assertEqual(tenant.description, tenant_description)

        self.assertEqual(tenant.enabled, tenant_enabled)

        # delete, get, and list a tenant

        client.tenants.delete(tenant=tenant.id)

        self.assertRaises(client_exceptions.NotFound, client.tenants.get,

                          tenant.id)

        self.assertFalse([t for t in client.tenants.list()

                         if t.id == tenant.id])

**** CubicPower OpenStack Study ****

    def test_tenant_create_update_and_delete_unicode(self):

        from keystoneclient import exceptions as client_exceptions

        tenant_name = u'original \u540d\u5b57'

        tenant_description = 'My original tenant!'

        tenant_enabled = True

        client = self.get_client(admin=True)

        # create, get, and list a tenant

        tenant = client.tenants.create(tenant_name,

                                       description=tenant_description,

                                       enabled=tenant_enabled)

        self.assertEqual(tenant.name, tenant_name)

        self.assertEqual(tenant.description, tenant_description)

        self.assertIs(tenant.enabled, tenant_enabled)

        tenant = client.tenants.get(tenant.id)

        self.assertEqual(tenant.name, tenant_name)

        self.assertEqual(tenant.description, tenant_description)

        self.assertIs(tenant.enabled, tenant_enabled)

        # multiple tenants exist due to fixtures, so find the one we're testing

        tenant = [t for t in client.tenants.list() if t.id == tenant.id].pop()

        self.assertEqual(tenant.name, tenant_name)

        self.assertEqual(tenant.description, tenant_description)

        self.assertIs(tenant.enabled, tenant_enabled)

        # update, get, and list a tenant

        tenant_name = u'updated \u540d\u5b57'

        tenant_description = 'Updated tenant!'

        tenant_enabled = False

        tenant = client.tenants.update(tenant.id,

                                       tenant_name=tenant_name,

                                       enabled=tenant_enabled,

                                       description=tenant_description)

        self.assertEqual(tenant.name, tenant_name)

        self.assertEqual(tenant.description, tenant_description)

        self.assertIs(tenant.enabled, tenant_enabled)

        tenant = client.tenants.get(tenant.id)

        self.assertEqual(tenant.name, tenant_name)

        self.assertEqual(tenant.description, tenant_description)

        self.assertIs(tenant.enabled, tenant_enabled)

        tenant = [t for t in client.tenants.list() if t.id == tenant.id].pop()

        self.assertEqual(tenant.name, tenant_name)

        self.assertEqual(tenant.description, tenant_description)

        self.assertIs(tenant.enabled, tenant_enabled)

        # delete, get, and list a tenant

        client.tenants.delete(tenant.id)

        self.assertRaises(client_exceptions.NotFound, client.tenants.get,

                          tenant.id)

        self.assertFalse([t for t in client.tenants.list()

                         if t.id == tenant.id])

**** CubicPower OpenStack Study ****

    def test_tenant_create_no_name(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.BadRequest,

                          client.tenants.create,

                          tenant_name="")

**** CubicPower OpenStack Study ****

    def test_tenant_delete_404(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.NotFound,

                          client.tenants.delete,

                          tenant=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_tenant_get_404(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.NotFound,

                          client.tenants.get,

                          tenant_id=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_tenant_update_404(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.NotFound,

                          client.tenants.update,

                          tenant_id=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_tenant_list(self):

        client = self.get_client()

        tenants = client.tenants.list()

        self.assertEqual(len(tenants), 1)

        # Admin endpoint should return *all* tenants

        client = self.get_client(admin=True)

        tenants = client.tenants.list()

        self.assertEqual(len(tenants), len(default_fixtures.TENANTS))

**** CubicPower OpenStack Study ****

    def test_invalid_password(self):

        from keystoneclient import exceptions as client_exceptions

        good_client = self._client(username=self.user_foo['name'],

                                   password=self.user_foo['password'])

        good_client.tenants.list()

        self.assertRaises(client_exceptions.Unauthorized,

                          self._client,

                          username=self.user_foo['name'],

                          password=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_invalid_user_and_password(self):

        from keystoneclient import exceptions as client_exceptions

        self.assertRaises(client_exceptions.Unauthorized,

                          self._client,

                          username=uuid.uuid4().hex,

                          password=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_change_password_invalidates_token(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        username = uuid.uuid4().hex

        passwd = uuid.uuid4().hex

        user = client.users.create(name=username, password=passwd,

                                   email=uuid.uuid4().hex)

        token_id = client.tokens.authenticate(username=username,

                                              password=passwd).id

        # authenticate with a token should work before a password change

        client.tokens.authenticate(token=token_id)

        client.users.update_password(user=user.id, password=uuid.uuid4().hex)

        # authenticate with a token should not work after a password change

        self.assertRaises(client_exceptions.Unauthorized,

                          client.tokens.authenticate,

                          token=token_id)

**** CubicPower OpenStack Study ****

    def test_disable_tenant_invalidates_token(self):

        from keystoneclient import exceptions as client_exceptions

        admin_client = self.get_client(admin=True)

        foo_client = self.get_client(self.user_foo)

        tenant_bar = admin_client.tenants.get(self.tenant_bar['id'])

        # Disable the tenant.

        tenant_bar.update(enabled=False)

        # Test that the token has been removed.

        self.assertRaises(client_exceptions.Unauthorized,

                          foo_client.tokens.authenticate,

                          token=foo_client.auth_token)

        # Test that the user access has been disabled.

        self.assertRaises(client_exceptions.Unauthorized,

                          self.get_client,

                          self.user_foo)

**** CubicPower OpenStack Study ****

    def test_delete_tenant_invalidates_token(self):

        from keystoneclient import exceptions as client_exceptions

        admin_client = self.get_client(admin=True)

        foo_client = self.get_client(self.user_foo)

        tenant_bar = admin_client.tenants.get(self.tenant_bar['id'])

        # Delete the tenant.

        tenant_bar.delete()

        # Test that the token has been removed.

        self.assertRaises(client_exceptions.Unauthorized,

                          foo_client.tokens.authenticate,

                          token=foo_client.auth_token)

        # Test that the user access has been disabled.

        self.assertRaises(client_exceptions.Unauthorized,

                          self.get_client,

                          self.user_foo)

**** CubicPower OpenStack Study ****

    def test_disable_user_invalidates_token(self):

        from keystoneclient import exceptions as client_exceptions

        admin_client = self.get_client(admin=True)

        foo_client = self.get_client(self.user_foo)

        admin_client.users.update_enabled(user=self.user_foo['id'],

                                          enabled=False)

        self.assertRaises(client_exceptions.Unauthorized,

                          foo_client.tokens.authenticate,

                          token=foo_client.auth_token)

        self.assertRaises(client_exceptions.Unauthorized,

                          self.get_client,

                          self.user_foo)

**** CubicPower OpenStack Study ****

    def test_delete_user_invalidates_token(self):

        from keystoneclient import exceptions as client_exceptions

        admin_client = self.get_client(admin=True)

        client = self.get_client(admin=False)

        username = uuid.uuid4().hex

        password = uuid.uuid4().hex

        user_id = admin_client.users.create(

            name=username, password=password, email=uuid.uuid4().hex).id

        token_id = client.tokens.authenticate(

            username=username, password=password).id

        # token should be usable before the user is deleted

        client.tokens.authenticate(token=token_id)

        admin_client.users.delete(user=user_id)

        # authenticate with a token should not work after the user is deleted

        self.assertRaises(client_exceptions.Unauthorized,

                          client.tokens.authenticate,

                          token=token_id)

    @mock.patch.object(timeutils, 'utcnow')

**** CubicPower OpenStack Study ****

    def test_token_expiry_maintained(self, mock_utcnow):

        now = datetime.datetime.utcnow()

        mock_utcnow.return_value = now

        foo_client = self.get_client(self.user_foo)

        orig_token = foo_client.service_catalog.catalog['token']

        mock_utcnow.return_value = now + datetime.timedelta(seconds=1)

        reauthenticated_token = foo_client.tokens.authenticate(

            token=foo_client.auth_token)

        self.assertCloseEnoughForGovernmentWork(

            timeutils.parse_isotime(orig_token['expires']),

            timeutils.parse_isotime(reauthenticated_token.expires))

**** CubicPower OpenStack Study ****

    def test_user_create_update_delete(self):

        from keystoneclient import exceptions as client_exceptions

        test_username = 'new_user'

        client = self.get_client(admin=True)

        user = client.users.create(name=test_username,

                                   password='password',

                                   email='user1@test.com')

        self.assertEqual(user.name, test_username)

        user = client.users.get(user=user.id)

        self.assertEqual(user.name, test_username)

        user = client.users.update(user=user,

                                   name=test_username,

                                   email='user2@test.com')

        self.assertEqual(user.email, 'user2@test.com')

        # NOTE(termie): update_enabled doesn't return anything, probably a bug

        client.users.update_enabled(user=user, enabled=False)

        user = client.users.get(user.id)

        self.assertFalse(user.enabled)

        self.assertRaises(client_exceptions.Unauthorized,

                          self._client,

                          username=test_username,

                          password='password')

        client.users.update_enabled(user, True)

        user = client.users.update_password(user=user, password='password2')

        self._client(username=test_username,

                     password='password2')

        user = client.users.update_tenant(user=user, tenant='bar')

        # TODO(ja): once keystonelight supports default tenant

        #           when you login without specifying tenant, the

        #           token should be scoped to tenant 'bar'

        client.users.delete(user.id)

        self.assertRaises(client_exceptions.NotFound, client.users.get,

                          user.id)

        # Test creating a user with a tenant (auto-add to tenant)

        user2 = client.users.create(name=test_username,

                                    password='password',

                                    email='user1@test.com',

                                    tenant_id='bar')

        self.assertEqual(user2.name, test_username)

**** CubicPower OpenStack Study ****

    def test_update_default_tenant_to_existing_value(self):

        client = self.get_client(admin=True)

        user = client.users.create(

            name=uuid.uuid4().hex,

            password=uuid.uuid4().hex,

            email=uuid.uuid4().hex,

            tenant_id=self.tenant_bar['id'])

        # attempting to update the tenant with the existing value should work

        user = client.users.update_tenant(

            user=user, tenant=self.tenant_bar['id'])

**** CubicPower OpenStack Study ****

    def test_user_create_no_string_password(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.BadRequest,

                          client.users.create,

                          name='test_user',

                          password=12345,

                          email=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_user_create_no_name(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.BadRequest,

                          client.users.create,

                          name="",

                          password=uuid.uuid4().hex,

                          email=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_user_create_404(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.NotFound,

                          client.users.create,

                          name=uuid.uuid4().hex,

                          password=uuid.uuid4().hex,

                          email=uuid.uuid4().hex,

                          tenant_id=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_user_get_404(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.NotFound,

                          client.users.get,

                          user=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_user_list_404(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.NotFound,

                          client.users.list,

                          tenant_id=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_user_update_404(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.NotFound,

                          client.users.update,

                          user=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_user_update_tenant(self):

        client = self.get_client(admin=True)

        tenant_id = uuid.uuid4().hex

        user = client.users.update(user=self.user_foo['id'],

                                   tenant_id=tenant_id)

        self.assertEqual(tenant_id, user.tenant_id)

**** CubicPower OpenStack Study ****

    def test_user_update_password_404(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.NotFound,

                          client.users.update_password,

                          user=uuid.uuid4().hex,

                          password=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_user_delete_404(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.NotFound,

                          client.users.delete,

                          user=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_user_list(self):

        client = self.get_client(admin=True)

        users = client.users.list()

        self.assertTrue(len(users) > 0)

        user = users[0]

        self.assertRaises(AttributeError, lambda: user.password)

**** CubicPower OpenStack Study ****

    def test_user_get(self):

        client = self.get_client(admin=True)

        user = client.users.get(user=self.user_foo['id'])

        self.assertRaises(AttributeError, lambda: user.password)

**** CubicPower OpenStack Study ****

    def test_role_get(self):

        client = self.get_client(admin=True)

        role = client.roles.get(role=self.role_admin['id'])

        self.assertEqual(role.id, self.role_admin['id'])

**** CubicPower OpenStack Study ****

    def test_role_crud(self):

        from keystoneclient import exceptions as client_exceptions

        test_role = 'new_role'

        client = self.get_client(admin=True)

        role = client.roles.create(name=test_role)

        self.assertEqual(role.name, test_role)

        role = client.roles.get(role=role.id)

        self.assertEqual(role.name, test_role)

        client.roles.delete(role=role.id)

        self.assertRaises(client_exceptions.NotFound,

                          client.roles.delete,

                          role=role.id)

        self.assertRaises(client_exceptions.NotFound,

                          client.roles.get,

                          role=role.id)

**** CubicPower OpenStack Study ****

    def test_role_create_no_name(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.BadRequest,

                          client.roles.create,

                          name="")

**** CubicPower OpenStack Study ****

    def test_role_get_404(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.NotFound,

                          client.roles.get,

                          role=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_role_delete_404(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.NotFound,

                          client.roles.delete,

                          role=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_role_list_404(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.NotFound,

                          client.roles.roles_for_user,

                          user=uuid.uuid4().hex,

                          tenant=uuid.uuid4().hex)

        self.assertRaises(client_exceptions.NotFound,

                          client.roles.roles_for_user,

                          user=self.user_foo['id'],

                          tenant=uuid.uuid4().hex)

        self.assertRaises(client_exceptions.NotFound,

                          client.roles.roles_for_user,

                          user=uuid.uuid4().hex,

                          tenant=self.tenant_bar['id'])

**** CubicPower OpenStack Study ****

    def test_role_list(self):

        client = self.get_client(admin=True)

        roles = client.roles.list()

        # TODO(devcamcar): This assert should be more specific.

        self.assertTrue(len(roles) > 0)

**** CubicPower OpenStack Study ****

    def test_service_crud(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        service_name = uuid.uuid4().hex

        service_type = uuid.uuid4().hex

        service_desc = uuid.uuid4().hex

        # create & read

        service = client.services.create(name=service_name,

                                         service_type=service_type,

                                         description=service_desc)

        self.assertEqual(service_name, service.name)

        self.assertEqual(service_type, service.type)

        self.assertEqual(service_desc, service.description)

        service = client.services.get(id=service.id)

        self.assertEqual(service_name, service.name)

        self.assertEqual(service_type, service.type)

        self.assertEqual(service_desc, service.description)

        service = [x for x in client.services.list() if x.id == service.id][0]

        self.assertEqual(service_name, service.name)

        self.assertEqual(service_type, service.type)

        self.assertEqual(service_desc, service.description)

        # update is not supported in API v2...

        # delete & read

        client.services.delete(id=service.id)

        self.assertRaises(client_exceptions.NotFound,

                          client.services.get,

                          id=service.id)

        services = [x for x in client.services.list() if x.id == service.id]

        self.assertEqual(len(services), 0)

**** CubicPower OpenStack Study ****

    def test_service_delete_404(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.NotFound,

                          client.services.delete,

                          id=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_service_get_404(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.NotFound,

                          client.services.get,

                          id=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_endpoint_delete_404(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.NotFound,

                          client.endpoints.delete,

                          id=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_admin_requires_adminness(self):

        from keystoneclient import exceptions as client_exceptions

        # FIXME(ja): this should be Unauthorized

        exception = client_exceptions.ClientException

        two = self.get_client(self.user_two, admin=True)  # non-admin user

        # USER CRUD

        self.assertRaises(exception,

                          two.users.list)

        self.assertRaises(exception,

                          two.users.get,

                          user=self.user_two['id'])

        self.assertRaises(exception,

                          two.users.create,

                          name='oops',

                          password='password',

                          email='oops@test.com')

        self.assertRaises(exception,

                          two.users.delete,

                          user=self.user_foo['id'])

        # TENANT CRUD

        self.assertRaises(exception,

                          two.tenants.list)

        self.assertRaises(exception,

                          two.tenants.get,

                          tenant_id=self.tenant_bar['id'])

        self.assertRaises(exception,

                          two.tenants.create,

                          tenant_name='oops',

                          description="shouldn't work!",

                          enabled=True)

        self.assertRaises(exception,

                          two.tenants.delete,

                          tenant=self.tenant_baz['id'])

        # ROLE CRUD

        self.assertRaises(exception,

                          two.roles.get,

                          role=self.role_admin['id'])

        self.assertRaises(exception,

                          two.roles.list)

        self.assertRaises(exception,

                          two.roles.create,

                          name='oops')

        self.assertRaises(exception,

                          two.roles.delete,

                          role=self.role_admin['id'])

        # TODO(ja): MEMBERSHIP CRUD

        # TODO(ja): determine what else todo

**** CubicPower OpenStack Study ****

    def test_tenant_add_and_remove_user(self):

        client = self.get_client(admin=True)

        client.roles.add_user_role(tenant=self.tenant_bar['id'],

                                   user=self.user_two['id'],

                                   role=self.role_other['id'])

        user_refs = client.tenants.list_users(tenant=self.tenant_bar['id'])

        self.assertTrue(self.user_two['id'] in [x.id for x in user_refs])

        client.roles.remove_user_role(tenant=self.tenant_bar['id'],

                                      user=self.user_two['id'],

                                      role=self.role_other['id'])

        roles = client.roles.roles_for_user(user=self.user_foo['id'],

                                            tenant=self.tenant_bar['id'])

        self.assertNotIn(self.role_other['id'], roles)

        user_refs = client.tenants.list_users(tenant=self.tenant_bar['id'])

        self.assertNotIn(self.user_two['id'], [x.id for x in user_refs])

**** CubicPower OpenStack Study ****

    def test_user_role_add_404(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.NotFound,

                          client.roles.add_user_role,

                          tenant=uuid.uuid4().hex,

                          user=self.user_foo['id'],

                          role=self.role_member['id'])

        self.assertRaises(client_exceptions.NotFound,

                          client.roles.add_user_role,

                          tenant=self.tenant_baz['id'],

                          user=self.user_foo['id'],

                          role=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_user_role_add_no_user(self):

        # If add_user_role and user doesn't exist, doesn't fail.

        client = self.get_client(admin=True)

        client.roles.add_user_role(tenant=self.tenant_baz['id'],

                                   user=uuid.uuid4().hex,

                                   role=self.role_member['id'])

**** CubicPower OpenStack Study ****

    def test_user_role_remove_404(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(admin=True)

        self.assertRaises(client_exceptions.NotFound,

                          client.roles.remove_user_role,

                          tenant=uuid.uuid4().hex,

                          user=self.user_foo['id'],

                          role=self.role_member['id'])

        self.assertRaises(client_exceptions.NotFound,

                          client.roles.remove_user_role,

                          tenant=self.tenant_baz['id'],

                          user=uuid.uuid4().hex,

                          role=self.role_member['id'])

        self.assertRaises(client_exceptions.NotFound,

                          client.roles.remove_user_role,

                          tenant=self.tenant_baz['id'],

                          user=self.user_foo['id'],

                          role=uuid.uuid4().hex)

        self.assertRaises(client_exceptions.NotFound,

                          client.roles.remove_user_role,

                          tenant=self.tenant_baz['id'],

                          user=self.user_foo['id'],

                          role=self.role_member['id'])

**** CubicPower OpenStack Study ****

    def test_tenant_list_marker(self):

        client = self.get_client()

        # Add two arbitrary tenants to user for testing purposes

        for i in range(2):

            tenant_id = uuid.uuid4().hex

            tenant = {'name': 'tenant-%s' % tenant_id, 'id': tenant_id,

                      'domain_id': DEFAULT_DOMAIN_ID}

            self.assignment_api.create_project(tenant_id, tenant)

            self.assignment_api.add_user_to_project(tenant_id,

                                                    self.user_foo['id'])

        tenants = client.tenants.list()

        self.assertEqual(len(tenants), 3)

        tenants_marker = client.tenants.list(marker=tenants[0].id)

        self.assertEqual(len(tenants_marker), 2)

        self.assertEqual(tenants[1].name, tenants_marker[0].name)

        self.assertEqual(tenants[2].name, tenants_marker[1].name)

**** CubicPower OpenStack Study ****

    def test_tenant_list_marker_not_found(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client()

        self.assertRaises(client_exceptions.BadRequest,

                          client.tenants.list, marker=uuid.uuid4().hex)

**** CubicPower OpenStack Study ****

    def test_tenant_list_limit(self):

        client = self.get_client()

        # Add two arbitrary tenants to user for testing purposes

        for i in range(2):

            tenant_id = uuid.uuid4().hex

            tenant = {'name': 'tenant-%s' % tenant_id, 'id': tenant_id,

                      'domain_id': DEFAULT_DOMAIN_ID}

            self.assignment_api.create_project(tenant_id, tenant)

            self.assignment_api.add_user_to_project(tenant_id,

                                                    self.user_foo['id'])

        tenants = client.tenants.list()

        self.assertEqual(len(tenants), 3)

        tenants_limited = client.tenants.list(limit=2)

        self.assertEqual(len(tenants_limited), 2)

        self.assertEqual(tenants[0].name, tenants_limited[0].name)

        self.assertEqual(tenants[1].name, tenants_limited[1].name)

**** CubicPower OpenStack Study ****

    def test_tenant_list_limit_bad_value(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client()

        self.assertRaises(client_exceptions.BadRequest,

                          client.tenants.list, limit='a')

        self.assertRaises(client_exceptions.BadRequest,

                          client.tenants.list, limit=-1)

**** CubicPower OpenStack Study ****

    def test_roles_get_by_user(self):

        client = self.get_client(admin=True)

        roles = client.roles.roles_for_user(user=self.user_foo['id'],

                                            tenant=self.tenant_bar['id'])

        self.assertTrue(len(roles) > 0)

**** CubicPower OpenStack Study ****

    def test_user_can_update_passwd(self):

        client = self.get_client(self.user_two)

        token_id = client.auth_token

        new_password = uuid.uuid4().hex

        # TODO(derekh): Update to use keystoneclient when available

        class FakeResponse(object):

            def start_fake_response(self, status, headers):

                self.response_status = int(status.split(' ', 1)[0])

                self.response_headers = dict(headers)

        responseobject = FakeResponse()

        req = webob.Request.blank(

            '/v2.0/OS-KSCRUD/users/%s' % self.user_two['id'],

            headers={'X-Auth-Token': token_id})

        req.method = 'PATCH'

        req.body = ('{"user":{"password":"%s","original_password":"%s"}}' %

                    (new_password, self.user_two['password']))

        self.public_server.application(req.environ,

                                       responseobject.start_fake_response)

        self.user_two['password'] = new_password

        self.get_client(self.user_two)

**** CubicPower OpenStack Study ****

    def test_user_cannot_update_other_users_passwd(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(self.user_two)

        token_id = client.auth_token

        new_password = uuid.uuid4().hex

        # TODO(derekh): Update to use keystoneclient when available

        class FakeResponse(object):

            def start_fake_response(self, status, headers):

                self.response_status = int(status.split(' ', 1)[0])

                self.response_headers = dict(headers)

        responseobject = FakeResponse()

        req = webob.Request.blank(

            '/v2.0/OS-KSCRUD/users/%s' % self.user_foo['id'],

            headers={'X-Auth-Token': token_id})

        req.method = 'PATCH'

        req.body = ('{"user":{"password":"%s","original_password":"%s"}}' %

                    (new_password, self.user_two['password']))

        self.public_server.application(req.environ,

                                       responseobject.start_fake_response)

        self.assertEqual(403, responseobject.response_status)

        self.user_two['password'] = new_password

        self.assertRaises(client_exceptions.Unauthorized,

                          self.get_client, self.user_two)

**** CubicPower OpenStack Study ****

    def test_tokens_after_user_update_passwd(self):

        from keystoneclient import exceptions as client_exceptions

        client = self.get_client(self.user_two)

        token_id = client.auth_token

        new_password = uuid.uuid4().hex

        # TODO(derekh): Update to use keystoneclient when available

        class FakeResponse(object):

            def start_fake_response(self, status, headers):

                self.response_status = int(status.split(' ', 1)[0])

                self.response_headers = dict(headers)

        responseobject = FakeResponse()

        req = webob.Request.blank(

            '/v2.0/OS-KSCRUD/users/%s' % self.user_two['id'],

            headers={'X-Auth-Token': token_id})

        req.method = 'PATCH'

        req.body = ('{"user":{"password":"%s","original_password":"%s"}}' %

                    (new_password, self.user_two['password']))

        rv = self.public_server.application(

            req.environ,

            responseobject.start_fake_response)

        response_json = jsonutils.loads(rv.pop())

        new_token_id = response_json['access']['token']['id']

        self.assertRaises(client_exceptions.Unauthorized, client.tenants.list)

        client.auth_token = new_token_id

        client.tenants.list()

**** CubicPower OpenStack Study ****

class KcMasterTestCase(CompatTestCase, KeystoneClientTests):

checkout_info = (KEYSTONECLIENT_REPO, 'master')

**** CubicPower OpenStack Study ****

class KcOptTestCase(KcMasterTestCase):

# Set KSCTEST_PATH to the keystoneclient directory, then run this test.

#

# For example, to test your local keystoneclient,

#

# KSCTEST_PATH=/opt/stack/python-keystoneclient \

# tox -e py27 test_keystoneclient.KcOptTestCase

**** CubicPower OpenStack Study ****

    def setUp(self):

        self.checkout_info = os.environ.get('KSCTEST_PATH')

        if not self.checkout_info:

            self.skip('Set KSCTEST_PATH env to test with local client')

        super(KcOptTestCase, self).setUp()