¡@

Home 

OpenStack Study: test_gridfs_store.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2013 OpenStack Foundation

# All Rights Reserved.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

import six

import stubout

from glance.common import exception

from glance.common import utils

from glance.store.gridfs import Store

from glance.tests.unit import base

try:

import gridfs

import pymongo

except ImportError:

pymongo = None

GRIDFS_CONF = {'verbose': True,

'debug': True,

'default_store': 'gridfs',

'mongodb_store_uri': 'mongodb://fake_store_uri',

'mongodb_store_db': 'fake_store_db'}

**** CubicPower OpenStack Study ****

def stub_out_gridfs(stubs):

    class FakeMongoClient(object):

        def __init__(self, *args, **kwargs):

            pass

        def __getitem__(self, key):

            return None

    class FakeGridFS(object):

        image_data = {}

        called_commands = []

        def __init__(self, *args, **kwargs):

            pass

        def exists(self, image_id):

            self.called_commands.append('exists')

            return False

        def put(self, image_file, _id):

            self.called_commands.append('put')

            data = None

            while True:

                data = image_file.read(64)

                if data:

                    self.image_data[_id] = \

                        self.image_data.setdefault(_id, '') + data

                else:

                    break

        def delete(self, _id):

            self.called_commands.append('delete')

    if pymongo is not None:

        stubs.Set(pymongo, 'MongoClient', FakeMongoClient)

        stubs.Set(gridfs, 'GridFS', FakeGridFS)

**** CubicPower OpenStack Study ****

class TestStore(base.StoreClearingUnitTest):

**** CubicPower OpenStack Study ****

    def setUp(self):

        """Establish a clean test environment"""

        self.config(**GRIDFS_CONF)

        super(TestStore, self).setUp()

        self.stubs = stubout.StubOutForTesting()

        stub_out_gridfs(self.stubs)

        self.store = Store()

        self.addCleanup(self.stubs.UnsetAll)

**** CubicPower OpenStack Study ****

    def test_cleanup_when_add_image_exception(self):

        if pymongo is None:

            msg = 'GridFS store can not add images, skip test.'

            self.skipTest(msg)

        self.assertRaises(exception.ImageSizeLimitExceeded,

                          self.store.add,

                          'fake_image_id',

                          utils.LimitingReader(six.StringIO('xx'), 1),

                          2)

        self.assertEqual(self.store.fs.called_commands,

                         ['exists', 'put', 'delete'])