¡@

Home 

OpenStack Study: test_base.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright (c) 2013 OpenStack Foundation

# All Rights Reserved.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

import abc

import functools

import os

import fixtures

import six

from keystone.openstack.common.db.sqlalchemy import session

from keystone.openstack.common.db.sqlalchemy import utils

from keystone.openstack.common.fixture import lockutils

from keystone.openstack.common import test

**** CubicPower OpenStack Study ****

class DbFixture(fixtures.Fixture):

"""Basic database fixture.

Allows to run tests on various db backends, such as SQLite, MySQL and

PostgreSQL. By

**** CubicPower OpenStack Study ****

    def _get_uri(self):

        return os.getenv('OS_TEST_DBAPI_CONNECTION', 'sqlite://')

**** CubicPower OpenStack Study ****

    def __init__(self, test):

        super(DbFixture, self).__init__()

        self.test = test

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(DbFixture, self).setUp()

        self.test.engine = session.create_engine(self._get_uri())

        self.test.sessionmaker = session.get_maker(self.test.engine)

        self.addCleanup(self.test.engine.dispose)

**** CubicPower OpenStack Study ****

class DbTestCase(test.BaseTestCase):

"""Base class for testing of DB code.

Using `DbFixture`. Intended to be the main database test case to use all

the tests on a given backend with user

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(DbTestCase, self).setUp()

        self.useFixture(self.FIXTURE(self))

ALLOWED_DIALECTS = ['sqlite', 'mysql', 'postgresql']

def backend_specific(*dialects):

    """Decorator to skip backend specific tests on inappropriate engines.

    ::dialects: list of dialects names under which the test will be launched.

    """

**** CubicPower OpenStack Study ****

def backend_specific(*dialects):

    """Decorator to skip backend specific tests on inappropriate engines.

    ::dialects: list of dialects names under which the test will be launched.

    """

**** CubicPower OpenStack Study ****

    def wrap(f):

        @functools.wraps(f)

        def ins_wrap(self):

            if not set(dialects).issubset(ALLOWED_DIALECTS):

                raise ValueError(

                    "Please use allowed dialects: %s" % ALLOWED_DIALECTS)

            if self.engine.name not in dialects:

                msg = ('The test "%s" can be run '

                       'only on %s. Current engine is %s.')

                args = (f.__name__, ' '.join(dialects), self.engine.name)

                self.skip(msg % args)

            else:

                return f(self)

        return ins_wrap

    return wrap

@six.add_metaclass(abc.ABCMeta)

**** CubicPower OpenStack Study ****

        def ins_wrap(self):

            if not set(dialects).issubset(ALLOWED_DIALECTS):

                raise ValueError(

                    "Please use allowed dialects: %s" % ALLOWED_DIALECTS)

            if self.engine.name not in dialects:

                msg = ('The test "%s" can be run '

                       'only on %s. Current engine is %s.')

                args = (f.__name__, ' '.join(dialects), self.engine.name)

                self.skip(msg % args)

            else:

                return f(self)

        return ins_wrap

    return wrap

@six.add_metaclass(abc.ABCMeta)

**** CubicPower OpenStack Study ****

class OpportunisticFixture(DbFixture):

"""Base fixture to use

**** CubicPower OpenStack Study ****

    def _get_uri(self):

        return utils.get_connect_string(backend=self.DRIVER,

                                        user=self.USERNAME,

                                        passwd=self.PASSWORD,

                                        database=self.DBNAME)

@six.add_metaclass(abc.ABCMeta)

**** CubicPower OpenStack Study ****

class OpportunisticTestCase(DbTestCase):

"""Base test case to use

**** CubicPower OpenStack Study ****

    def setUp(self):

        # TODO(bnemec): Remove this once infra is ready for

        # https://review.openstack.org/#/c/74963/ to merge.

        self.useFixture(lockutils.LockFixture('opportunistic-db'))

        credentials = {

            'backend': self.FIXTURE.DRIVER,

            'user': self.FIXTURE.USERNAME,

            'passwd': self.FIXTURE.PASSWORD,

            'database': self.FIXTURE.DBNAME}

        if self.FIXTURE.DRIVER and not utils.is_backend_avail(**credentials):

            msg = '%s backend is not available.' % self.FIXTURE.DRIVER

            return self.skip(msg)

        super(OpportunisticTestCase, self).setUp()

**** CubicPower OpenStack Study ****

class MySQLOpportunisticFixture(OpportunisticFixture):

DRIVER = 'mysql'

**** CubicPower OpenStack Study ****

class PostgreSQLOpportunisticFixture(OpportunisticFixture):

DRIVER = 'postgresql'

**** CubicPower OpenStack Study ****

class MySQLOpportunisticTestCase(OpportunisticTestCase):

FIXTURE = MySQLOpportunisticFixture

**** CubicPower OpenStack Study ****

class PostgreSQLOpportunisticTestCase(OpportunisticTestCase):

FIXTURE = PostgreSQLOpportunisticFixture