

OpenStack Study: test_revoke.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at


# http://www.apache.org/licenses/LICENSE-2.0


# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

import datetime

import uuid

import mock

from keystone.common import dependency

from keystone import config

from keystone.contrib.revoke import model

from keystone import exception

from keystone.openstack.common import timeutils

from keystone import tests

from keystone.tests import test_backend_sql

CONF = config.CONF

**** CubicPower OpenStack Study ****

def _new_id():

    return uuid.uuid4().hex

**** CubicPower OpenStack Study ****

def _future_time():

    expire_delta = datetime.timedelta(seconds=1000)

    future_time = timeutils.utcnow() + expire_delta

    return future_time

**** CubicPower OpenStack Study ****

def _past_time():

    expire_delta = datetime.timedelta(days=-1000)

    past_time = timeutils.utcnow() + expire_delta

    return past_time

**** CubicPower OpenStack Study ****

def _sample_blank_token():

    issued_delta = datetime.timedelta(minutes=-2)

    issued_at = timeutils.utcnow() + issued_delta

    token_data = model.blank_token_data(issued_at)

    return token_data

**** CubicPower OpenStack Study ****

def _matches(event, token_values):

    """See if the token matches the revocation event.

    Used as a secondary check on the logic to Check

    By Tree Below:  This is abrute force approach to checking.

    Compare each attribute from the event with the corresponding

    value from the token.  If the event does not have a value for

    the attribute, a match is still possible.  If the event has a

    value for the attribute, and it does not match the token, no match

    is possible, so skip the remaining checks.

    :param event one revocation event to match

    :param token_values dictionary with set of values taken from the


    :returns if the token matches the revocation event, indicating the

    token has been revoked


    # The token has three attributes that can match the user_id

    if event.user_id is not None:

        for attribute_name in ['user_id', 'trustor_id', 'trustee_id']:

            if event.user_id == token_values[attribute_name]:



            return False

    # The token has two attributes that can match the domain_id

    if event.domain_id is not None:

        dom_id_matched = False

        for attribute_name in ['user_domain_id', 'project_domain_id']:

            if event.domain_id == token_values[attribute_name]:

                dom_id_matched = True


        if not dom_id_matched:

            return False

    # If any one check does not match, the while token does

    # not match the event. The numerous return False indicate

    # that the token is still valid and short-circuits the

    # rest of the logic.

    attribute_names = ['project_id',

                       'expires_at', 'trust_id', 'consumer_id',


    for attribute_name in attribute_names:

        if getattr(event, attribute_name) is not None:

            if (getattr(event, attribute_name) !=


                        return False

    if event.role_id is not None:

        roles = token_values['roles']

        role_found = False

        for role in roles:

            if event.role_id == role:

                role_found = True


        if not role_found:

            return False

    if token_values['issued_at'] > event.issued_before:

        return False

    return True


**** CubicPower OpenStack Study ****

class RevokeTests(object):

**** CubicPower OpenStack Study ****

    def test_list(self):


        self.assertEqual(1, len(self.revoke_api.get_events()))


        self.assertEqual(2, len(self.revoke_api.get_events()))

**** CubicPower OpenStack Study ****

    def test_list_since(self):



        past = timeutils.utcnow() - datetime.timedelta(seconds=1000)

        self.assertEqual(2, len(self.revoke_api.get_events(past)))

        future = timeutils.utcnow() + datetime.timedelta(seconds=1000)

        self.assertEqual(0, len(self.revoke_api.get_events(future)))

**** CubicPower OpenStack Study ****

    def test_past_expiry_are_removed(self):

        user_id = 1

        self.revoke_api.revoke_by_expiration(user_id, _future_time())

        self.assertEqual(1, len(self.revoke_api.get_events()))

        event = model.RevokeEvent()

        event.revoked_at = _past_time()


        self.assertEqual(1, len(self.revoke_api.get_events()))

    @mock.patch.object(timeutils, 'utcnow')

**** CubicPower OpenStack Study ****

    def test_expired_events_removed_validate_token_success(self, mock_utcnow):

        def _sample_token_values():

            token = _sample_blank_token()

            token['expires_at'] = timeutils.isotime(_future_time(),


            return token

        now = datetime.datetime.utcnow()

        now_plus_2h = now + datetime.timedelta(hours=2)

        mock_utcnow.return_value = now

        # Build a token and validate it. This will seed the cache for the

        # future 'synchronize' call.

        token_values = _sample_token_values()

        user_id = _new_id()


        token_values['user_id'] = user_id




        # Move our clock forward by 2h, build a new token and validate it.

        # 'synchronize' should now be exercised and remove old expired events

        mock_utcnow.return_value = now_plus_2h

        self.revoke_api.revoke_by_expiration(_new_id(), now_plus_2h)

        #should no longer throw an exception


**** CubicPower OpenStack Study ****

        def _sample_token_values():

            token = _sample_blank_token()

            token['expires_at'] = timeutils.isotime(_future_time(),


            return token

        now = datetime.datetime.utcnow()

        now_plus_2h = now + datetime.timedelta(hours=2)

        mock_utcnow.return_value = now

        # Build a token and validate it. This will seed the cache for the

        # future 'synchronize' call.

        token_values = _sample_token_values()

        user_id = _new_id()


        token_values['user_id'] = user_id




        # Move our clock forward by 2h, build a new token and validate it.

        # 'synchronize' should now be exercised and remove old expired events

        mock_utcnow.return_value = now_plus_2h

        self.revoke_api.revoke_by_expiration(_new_id(), now_plus_2h)

        #should no longer throw an exception


**** CubicPower OpenStack Study ****

class SqlRevokeTests(test_backend_sql.SqlTests, RevokeTests):

**** CubicPower OpenStack Study ****

    def config_overrides(self):

        super(SqlRevokeTests, self).config_overrides()








**** CubicPower OpenStack Study ****

class KvsRevokeTests(tests.TestCase, RevokeTests):

**** CubicPower OpenStack Study ****

    def config_overrides(self):

        super(KvsRevokeTests, self).config_overrides()








**** CubicPower OpenStack Study ****

    def setUp(self):

        super(KvsRevokeTests, self).setUp()


**** CubicPower OpenStack Study ****

class RevokeTreeTests(tests.TestCase):

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(RevokeTreeTests, self).setUp()

        self.events = []

        self.tree = model.RevokeTree()


**** CubicPower OpenStack Study ****

    def _sample_data(self):

        user_ids = []

        project_ids = []

        role_ids = []

        for i in range(0, 3):




        project_tokens = []

        i = len(project_tokens)


        project_tokens[i]['user_id'] = user_ids[0]

        project_tokens[i]['project_id'] = project_ids[0]

        project_tokens[i]['roles'] = [role_ids[1]]

        i = len(project_tokens)


        project_tokens[i]['user_id'] = user_ids[1]

        project_tokens[i]['project_id'] = project_ids[0]

        project_tokens[i]['roles'] = [role_ids[0]]

        i = len(project_tokens)


        project_tokens[i]['user_id'] = user_ids[0]

        project_tokens[i]['project_id'] = project_ids[1]

        project_tokens[i]['roles'] = [role_ids[0]]

        token_to_revoke = _sample_blank_token()

        token_to_revoke['user_id'] = user_ids[0]

        token_to_revoke['project_id'] = project_ids[0]

        token_to_revoke['roles'] = [role_ids[0]]

        self.project_tokens = project_tokens

        self.user_ids = user_ids

        self.project_ids = project_ids

        self.role_ids = role_ids

        self.token_to_revoke = token_to_revoke

**** CubicPower OpenStack Study ****

    def _assertTokenRevoked(self, token_data):

        self.assertTrue(any([_matches(e, token_data) for e in self.events]))

        return self.assertTrue(self.tree.is_revoked(token_data),

                               'Token should be revoked')

**** CubicPower OpenStack Study ****

    def _assertTokenNotRevoked(self, token_data):

        self.assertFalse(any([_matches(e, token_data) for e in self.events]))

        return self.assertFalse(self.tree.is_revoked(token_data),

                                'Token should not be revoked')

**** CubicPower OpenStack Study ****

    def _revoke_by_user(self, user_id):

        return self.tree.add_event(


**** CubicPower OpenStack Study ****

    def _revoke_by_expiration(self, user_id, expires_at):

        event = self.tree.add_event(




        return event

**** CubicPower OpenStack Study ****

    def _revoke_by_grant(self, role_id, user_id=None,

                         domain_id=None, project_id=None):

        event = self.tree.add_event(






        return event

**** CubicPower OpenStack Study ****

    def _revoke_by_user_and_project(self, user_id, project_id):

        event = self.tree.add_event(




        return event

**** CubicPower OpenStack Study ****

    def _revoke_by_project_role_assignment(self, project_id, role_id):

        event = self.tree.add_event(




        return event

**** CubicPower OpenStack Study ****

    def _revoke_by_domain_role_assignment(self, domain_id, role_id):

        event = self.tree.add_event(




        return event

**** CubicPower OpenStack Study ****

    def _user_field_test(self, field_name):

        user_id = _new_id()

        event = self._revoke_by_user(user_id)


        token_data_u1 = _sample_blank_token()

        token_data_u1[field_name] = user_id


        token_data_u2 = _sample_blank_token()

        token_data_u2[field_name] = _new_id()





**** CubicPower OpenStack Study ****

    def test_revoke_by_user(self):


**** CubicPower OpenStack Study ****

    def test_revoke_by_user_matches_trustee(self):


**** CubicPower OpenStack Study ****

    def test_revoke_by_user_matches_trustor(self):


**** CubicPower OpenStack Study ****

    def test_by_user_expiration(self):

        future_time = _future_time()

        user_id = 1

        event = self._revoke_by_expiration(user_id, future_time)

        token_data_1 = _sample_blank_token()

        token_data_1['user_id'] = user_id

        token_data_1['expires_at'] = future_time


        token_data_2 = _sample_blank_token()

        token_data_2['user_id'] = user_id

        expire_delta = datetime.timedelta(seconds=2000)

        future_time = timeutils.utcnow() + expire_delta

        token_data_2['expires_at'] = future_time




**** CubicPower OpenStack Study ****

    def removeEvent(self, event):



**** CubicPower OpenStack Study ****

    def test_by_project_grant(self):

        token_to_revoke = self.token_to_revoke

        tokens = self.project_tokens


        for token in tokens:


        event = self._revoke_by_grant(role_id=self.role_ids[0],




        for token in tokens:




        for token in tokens:


        token_to_revoke['roles'] = [self.role_ids[0],



        event = self._revoke_by_grant(role_id=self.role_ids[0],






        event = self._revoke_by_grant(role_id=self.role_ids[1],
















**** CubicPower OpenStack Study ****

    def test_by_project_and_user_and_role(self):

        user_id1 = _new_id()

        user_id2 = _new_id()

        project_id = _new_id()



            self._revoke_by_user_and_project(user_id2, project_id))

        token_data = _sample_blank_token()

        token_data['user_id'] = user_id2

        token_data['project_id'] = project_id


**** CubicPower OpenStack Study ****

    def _assertEmpty(self, collection):

        return self.assertEqual(0, len(collection), "collection not empty")

**** CubicPower OpenStack Study ****

    def _assertEventsMatchIteration(self, turn):

        self.assertEqual(1, len(self.tree.revoke_map))

        self.assertEqual(turn + 1, len(self.tree.revoke_map




        # two different functions add  domain_ids, +1 for None

        self.assertEqual(2 * turn + 1, len(self.tree.revoke_map





        # two different functions add  project_ids, +1 for None

        self.assertEqual(2 * turn + 1, len(self.tree.revoke_map






        # 10 users added

        self.assertEqual(turn, len(self.tree.revoke_map







**** CubicPower OpenStack Study ****

    def test_cleanup(self):

        events = self.events


        for i in range(0, 10):




                self._revoke_by_expiration(_new_id(), _future_time()))


                self._revoke_by_project_role_assignment(_new_id(), _new_id()))


                self._revoke_by_domain_role_assignment(_new_id(), _new_id()))


                self._revoke_by_domain_role_assignment(_new_id(), _new_id()))


                self._revoke_by_user_and_project(_new_id(), _new_id()))

            self._assertEventsMatchIteration(i + 1)

        for event in self.events:

