¡@

Home 

OpenStack Study: test_volume_metadata.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2013 OpenStack Foundation.

# All Rights Reserved.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

import uuid

from oslo.config import cfg

import webob

from cinder.api import extensions

from cinder.api.v2 import volume_metadata

from cinder.api.v2 import volumes

from cinder import db

from cinder import exception

from cinder.openstack.common import jsonutils

from cinder import test

from cinder.tests.api import fakes

from cinder.tests.api.v2 import stubs

from cinder.volume import api as volume_api

CONF = cfg.CONF

**** CubicPower OpenStack Study ****

def return_create_volume_metadata_max(context, volume_id, metadata, delete):

    return stub_max_volume_metadata()

**** CubicPower OpenStack Study ****

def return_create_volume_metadata(context, volume_id, metadata, delete):

    return stub_volume_metadata()

**** CubicPower OpenStack Study ****

def return_new_volume_metadata(context, volume_id, metadata, delete):

    return stub_new_volume_metadata()

**** CubicPower OpenStack Study ****

def return_create_volume_metadata_insensitive(context, snapshot_id,

                                              metadata, delete):

    return stub_volume_metadata_insensitive()

**** CubicPower OpenStack Study ****

def return_volume_metadata(context, volume_id):

    if not isinstance(volume_id, str) or not len(volume_id) == 36:

        msg = 'id %s must be a uuid in return volume metadata' % volume_id

        raise Exception(msg)

    return stub_volume_metadata()

**** CubicPower OpenStack Study ****

def return_empty_volume_metadata(context, volume_id):

    return {}

**** CubicPower OpenStack Study ****

def return_empty_container_metadata(context, volume_id, metadata, delete):

    return {}

**** CubicPower OpenStack Study ****

def delete_volume_metadata(context, volume_id, key):

    pass

**** CubicPower OpenStack Study ****

def stub_volume_metadata():

    metadata = {

        "key1": "value1",

        "key2": "value2",

        "key3": "value3",

    }

    return metadata

**** CubicPower OpenStack Study ****

def stub_new_volume_metadata():

    metadata = {

        'key10': 'value10',

        'key99': 'value99',

        'KEY20': 'value20',

    }

    return metadata

**** CubicPower OpenStack Study ****

def stub_volume_metadata_insensitive():

    metadata = {

        "key1": "value1",

        "key2": "value2",

        "key3": "value3",

        "KEY4": "value4",

    }

    return metadata

**** CubicPower OpenStack Study ****

def stub_max_volume_metadata():

    metadata = {"metadata": {}}

    for num in range(CONF.quota_metadata_items):

        metadata['metadata']['key%i' % num] = "blah"

    return metadata

**** CubicPower OpenStack Study ****

def return_volume(context, volume_id):

    return {'id': '0cc3346e-9fef-4445-abe6-5d2b2690ec64',

            'name': 'fake',

            'metadata': {}}

**** CubicPower OpenStack Study ****

def return_volume_nonexistent(context, volume_id):

    raise exception.VolumeNotFound('bogus test message')

**** CubicPower OpenStack Study ****

def fake_update_volume_metadata(self, context, volume, diff):

    pass

**** CubicPower OpenStack Study ****

class volumeMetaDataTest(test.TestCase):

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(volumeMetaDataTest, self).setUp()

        self.volume_api = volume_api.API()

        fakes.stub_out_key_pair_funcs(self.stubs)

        self.stubs.Set(db, 'volume_get', return_volume)

        self.stubs.Set(db, 'volume_metadata_get',

                       return_volume_metadata)

        self.stubs.Set(db, 'service_get_all_by_topic',

                       stubs.stub_service_get_all_by_topic)

        self.stubs.Set(self.volume_api, 'update_volume_metadata',

                       fake_update_volume_metadata)

        self.ext_mgr = extensions.ExtensionManager()

        self.ext_mgr.extensions = {}

        self.volume_controller = volumes.VolumeController(self.ext_mgr)

        self.controller = volume_metadata.Controller()

        self.req_id = str(uuid.uuid4())

        self.url = '/v2/fake/volumes/%s/metadata' % self.req_id

        vol = {"size": 100,

               "display_name": "Volume Test Name",

               "display_description": "Volume Test Desc",

               "availability_zone": "zone1:host1",

               "metadata": {}}

        body = {"volume": vol}

        req = fakes.HTTPRequest.blank('/v2/volumes')

        self.volume_controller.create(req, body)

**** CubicPower OpenStack Study ****

    def test_index(self):

        req = fakes.HTTPRequest.blank(self.url)

        res_dict = self.controller.index(req, self.req_id)

        expected = {

            'metadata': {

                'key1': 'value1',

                'key2': 'value2',

                'key3': 'value3',

            },

        }

        self.assertEqual(expected, res_dict)

**** CubicPower OpenStack Study ****

    def test_index_nonexistent_volume(self):

        self.stubs.Set(db, 'volume_metadata_get',

                       return_volume_nonexistent)

        req = fakes.HTTPRequest.blank(self.url)

        self.assertRaises(webob.exc.HTTPNotFound,

                          self.controller.index, req, self.url)

**** CubicPower OpenStack Study ****

    def test_index_no_data(self):

        self.stubs.Set(db, 'volume_metadata_get',

                       return_empty_volume_metadata)

        req = fakes.HTTPRequest.blank(self.url)

        res_dict = self.controller.index(req, self.req_id)

        expected = {'metadata': {}}

        self.assertEqual(expected, res_dict)

**** CubicPower OpenStack Study ****

    def test_show(self):

        req = fakes.HTTPRequest.blank(self.url + '/key2')

        res_dict = self.controller.show(req, self.req_id, 'key2')

        expected = {'meta': {'key2': 'value2'}}

        self.assertEqual(expected, res_dict)

**** CubicPower OpenStack Study ****

    def test_show_nonexistent_volume(self):

        self.stubs.Set(db, 'volume_metadata_get',

                       return_volume_nonexistent)

        req = fakes.HTTPRequest.blank(self.url + '/key2')

        self.assertRaises(webob.exc.HTTPNotFound,

                          self.controller.show, req, self.req_id, 'key2')

**** CubicPower OpenStack Study ****

    def test_show_meta_not_found(self):

        self.stubs.Set(db, 'volume_metadata_get',

                       return_empty_volume_metadata)

        req = fakes.HTTPRequest.blank(self.url + '/key6')

        self.assertRaises(webob.exc.HTTPNotFound,

                          self.controller.show, req, self.req_id, 'key6')

**** CubicPower OpenStack Study ****

    def test_delete(self):

        self.stubs.Set(db, 'volume_metadata_get',

                       return_volume_metadata)

        self.stubs.Set(db, 'volume_metadata_delete',

                       delete_volume_metadata)

        req = fakes.HTTPRequest.blank(self.url + '/key2')

        req.method = 'DELETE'

        res = self.controller.delete(req, self.req_id, 'key2')

        self.assertEqual(200, res.status_int)

**** CubicPower OpenStack Study ****

    def test_delete_nonexistent_volume(self):

        self.stubs.Set(db, 'volume_get',

                       return_volume_nonexistent)

        req = fakes.HTTPRequest.blank(self.url + '/key1')

        req.method = 'DELETE'

        self.assertRaises(webob.exc.HTTPNotFound,

                          self.controller.delete, req, self.req_id, 'key1')

**** CubicPower OpenStack Study ****

    def test_delete_meta_not_found(self):

        self.stubs.Set(db, 'volume_metadata_get',

                       return_empty_volume_metadata)

        req = fakes.HTTPRequest.blank(self.url + '/key6')

        req.method = 'DELETE'

        self.assertRaises(webob.exc.HTTPNotFound,

                          self.controller.delete, req, self.req_id, 'key6')

**** CubicPower OpenStack Study ****

    def test_create(self):

        self.stubs.Set(db, 'volume_metadata_get',

                       return_empty_volume_metadata)

        self.stubs.Set(db, 'volume_metadata_update',

                       return_create_volume_metadata)

        req = fakes.HTTPRequest.blank('/v2/volume_metadata')

        req.method = 'POST'

        req.content_type = "application/json"

        body = {"metadata": {"key1": "value1",

                             "key2": "value2",

                             "key3": "value3", }}

        req.body = jsonutils.dumps(body)

        res_dict = self.controller.create(req, self.req_id, body)

        self.assertEqual(body, res_dict)

**** CubicPower OpenStack Study ****

    def test_create_with_keys_in_uppercase_and_lowercase(self):

        # if the keys in uppercase_and_lowercase, should return the one

        # which server added

        self.stubs.Set(db, 'volume_metadata_get',

                       return_empty_volume_metadata)

        self.stubs.Set(db, 'volume_metadata_update',

                       return_create_volume_metadata_insensitive)

        req = fakes.HTTPRequest.blank('/v2/volume_metadata')

        req.method = 'POST'

        req.content_type = "application/json"

        body = {"metadata": {"key1": "value1",

                             "KEY1": "value1",

                             "key2": "value2",

                             "KEY2": "value2",

                             "key3": "value3",

                             "KEY4": "value4"}}

        expected = {"metadata": {"key1": "value1",

                                 "key2": "value2",

                                 "key3": "value3",

                                 "KEY4": "value4"}}

        req.body = jsonutils.dumps(body)

        res_dict = self.controller.create(req, self.req_id, body)

        self.assertEqual(expected, res_dict)

**** CubicPower OpenStack Study ****

    def test_create_empty_body(self):

        self.stubs.Set(db, 'volume_metadata_update',

                       return_create_volume_metadata)

        req = fakes.HTTPRequest.blank(self.url)

        req.method = 'POST'

        req.headers["content-type"] = "application/json"

        self.assertRaises(webob.exc.HTTPBadRequest,

                          self.controller.create, req, self.req_id, None)

**** CubicPower OpenStack Study ****

    def test_create_item_empty_key(self):

        self.stubs.Set(db, 'volume_metadata_update',

                       return_create_volume_metadata)

        req = fakes.HTTPRequest.blank(self.url + '/key1')

        req.method = 'PUT'

        body = {"meta": {"": "value1"}}

        req.body = jsonutils.dumps(body)

        req.headers["content-type"] = "application/json"

        self.assertRaises(webob.exc.HTTPBadRequest,

                          self.controller.create, req, self.req_id, body)

**** CubicPower OpenStack Study ****

    def test_create_item_key_too_long(self):

        self.stubs.Set(db, 'volume_metadata_update',

                       return_create_volume_metadata)

        req = fakes.HTTPRequest.blank(self.url + '/key1')

        req.method = 'PUT'

        body = {"meta": {("a" * 260): "value1"}}

        req.body = jsonutils.dumps(body)

        req.headers["content-type"] = "application/json"

        self.assertRaises(webob.exc.HTTPBadRequest,

                          self.controller.create,

                          req, self.req_id, body)

**** CubicPower OpenStack Study ****

    def test_create_nonexistent_volume(self):

        self.stubs.Set(db, 'volume_get',

                       return_volume_nonexistent)

        self.stubs.Set(db, 'volume_metadata_get',

                       return_volume_metadata)

        self.stubs.Set(db, 'volume_metadata_update',

                       return_create_volume_metadata)

        req = fakes.HTTPRequest.blank('/v2/volume_metadata')

        req.method = 'POST'

        req.content_type = "application/json"

        body = {"metadata": {"key9": "value9"}}

        req.body = jsonutils.dumps(body)

        self.assertRaises(webob.exc.HTTPNotFound,

                          self.controller.create, req, self.req_id, body)

**** CubicPower OpenStack Study ****

    def test_update_all(self):

        self.stubs.Set(db, 'volume_metadata_update',

                       return_new_volume_metadata)

        req = fakes.HTTPRequest.blank(self.url)

        req.method = 'PUT'

        req.content_type = "application/json"

        expected = {

            'metadata': {

                'key10': 'value10',

                'key99': 'value99',

                'KEY20': 'value20',

            },

        }

        req.body = jsonutils.dumps(expected)

        res_dict = self.controller.update_all(req, self.req_id, expected)

        self.assertEqual(expected, res_dict)

**** CubicPower OpenStack Study ****

    def test_update_all_with_keys_in_uppercase_and_lowercase(self):

        self.stubs.Set(db, 'volume_metadata_get',

                       return_create_volume_metadata)

        self.stubs.Set(db, 'volume_metadata_update',

                       return_new_volume_metadata)

        req = fakes.HTTPRequest.blank(self.url)

        req.method = 'PUT'

        req.content_type = "application/json"

        body = {

            'metadata': {

                'key10': 'value10',

                'KEY10': 'value10',

                'key99': 'value99',

                'KEY20': 'value20',

            },

        }

        expected = {

            'metadata': {

                'key10': 'value10',

                'key99': 'value99',

                'KEY20': 'value20',

            },

        }

        req.body = jsonutils.dumps(expected)

        res_dict = self.controller.update_all(req, self.req_id, body)

        self.assertEqual(expected, res_dict)

**** CubicPower OpenStack Study ****

    def test_update_all_empty_container(self):

        self.stubs.Set(db, 'volume_metadata_update',

                       return_empty_container_metadata)

        req = fakes.HTTPRequest.blank(self.url)

        req.method = 'PUT'

        req.content_type = "application/json"

        expected = {'metadata': {}}

        req.body = jsonutils.dumps(expected)

        res_dict = self.controller.update_all(req, self.req_id, expected)

        self.assertEqual(expected, res_dict)

**** CubicPower OpenStack Study ****

    def test_update_all_malformed_container(self):

        self.stubs.Set(db, 'volume_metadata_update',

                       return_create_volume_metadata)

        req = fakes.HTTPRequest.blank(self.url)

        req.method = 'PUT'

        req.content_type = "application/json"

        expected = {'meta': {}}

        req.body = jsonutils.dumps(expected)

        self.assertRaises(webob.exc.HTTPBadRequest,

                          self.controller.update_all, req, self.req_id,

                          expected)

**** CubicPower OpenStack Study ****

    def test_update_all_malformed_data(self):

        self.stubs.Set(db, 'volume_metadata_update',

                       return_create_volume_metadata)

        req = fakes.HTTPRequest.blank(self.url)

        req.method = 'PUT'

        req.content_type = "application/json"

        expected = {'metadata': ['asdf']}

        req.body = jsonutils.dumps(expected)

        self.assertRaises(webob.exc.HTTPBadRequest,

                          self.controller.update_all, req, self.req_id,

                          expected)

**** CubicPower OpenStack Study ****

    def test_update_all_nonexistent_volume(self):

        self.stubs.Set(db, 'volume_get', return_volume_nonexistent)

        req = fakes.HTTPRequest.blank(self.url)

        req.method = 'PUT'

        req.content_type = "application/json"

        body = {'metadata': {'key10': 'value10'}}

        req.body = jsonutils.dumps(body)

        self.assertRaises(webob.exc.HTTPNotFound,

                          self.controller.update_all, req, '100', body)

**** CubicPower OpenStack Study ****

    def test_update_item(self):

        self.stubs.Set(db, 'volume_metadata_update',

                       return_create_volume_metadata)

        req = fakes.HTTPRequest.blank(self.url + '/key1')

        req.method = 'PUT'

        body = {"meta": {"key1": "value1"}}

        req.body = jsonutils.dumps(body)

        req.headers["content-type"] = "application/json"

        res_dict = self.controller.update(req, self.req_id, 'key1', body)

        expected = {'meta': {'key1': 'value1'}}

        self.assertEqual(expected, res_dict)

**** CubicPower OpenStack Study ****

    def test_update_item_nonexistent_volume(self):

        self.stubs.Set(db, 'volume_get',

                       return_volume_nonexistent)

        req = fakes.HTTPRequest.blank('/v2/fake/volumes/asdf/metadata/key1')

        req.method = 'PUT'

        body = {"meta": {"key1": "value1"}}

        req.body = jsonutils.dumps(body)

        req.headers["content-type"] = "application/json"

        self.assertRaises(webob.exc.HTTPNotFound,

                          self.controller.update, req, self.req_id, 'key1',

                          body)

**** CubicPower OpenStack Study ****

    def test_update_item_empty_body(self):

        self.stubs.Set(db, 'volume_metadata_update',

                       return_create_volume_metadata)

        req = fakes.HTTPRequest.blank(self.url + '/key1')

        req.method = 'PUT'

        req.headers["content-type"] = "application/json"

        self.assertRaises(webob.exc.HTTPBadRequest,

                          self.controller.update, req, self.req_id, 'key1',

                          None)

**** CubicPower OpenStack Study ****

    def test_update_item_empty_key(self):

        self.stubs.Set(db, 'volume_metadata_update',

                       return_create_volume_metadata)

        req = fakes.HTTPRequest.blank(self.url + '/key1')

        req.method = 'PUT'

        body = {"meta": {"": "value1"}}

        req.body = jsonutils.dumps(body)

        req.headers["content-type"] = "application/json"

        self.assertRaises(webob.exc.HTTPBadRequest,

                          self.controller.update, req, self.req_id, '', body)

**** CubicPower OpenStack Study ****

    def test_update_item_key_too_long(self):

        self.stubs.Set(db, 'volume_metadata_update',

                       return_create_volume_metadata)

        req = fakes.HTTPRequest.blank(self.url + '/key1')

        req.method = 'PUT'

        body = {"meta": {("a" * 260): "value1"}}

        req.body = jsonutils.dumps(body)

        req.headers["content-type"] = "application/json"

        self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,

                          self.controller.update,

                          req, self.req_id, ("a" * 260), body)

**** CubicPower OpenStack Study ****

    def test_update_item_value_too_long(self):

        self.stubs.Set(db, 'volume_metadata_update',

                       return_create_volume_metadata)

        req = fakes.HTTPRequest.blank(self.url + '/key1')

        req.method = 'PUT'

        body = {"meta": {"key1": ("a" * 260)}}

        req.body = jsonutils.dumps(body)

        req.headers["content-type"] = "application/json"

        self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,

                          self.controller.update,

                          req, self.req_id, "key1", body)

**** CubicPower OpenStack Study ****

    def test_update_item_too_many_keys(self):

        self.stubs.Set(db, 'volume_metadata_update',

                       return_create_volume_metadata)

        req = fakes.HTTPRequest.blank(self.url + '/key1')

        req.method = 'PUT'

        body = {"meta": {"key1": "value1", "key2": "value2"}}

        req.body = jsonutils.dumps(body)

        req.headers["content-type"] = "application/json"

        self.assertRaises(webob.exc.HTTPBadRequest,

                          self.controller.update, req, self.req_id, 'key1',

                          body)

**** CubicPower OpenStack Study ****

    def test_update_item_body_uri_mismatch(self):

        self.stubs.Set(db, 'volume_metadata_update',

                       return_create_volume_metadata)

        req = fakes.HTTPRequest.blank(self.url + '/bad')

        req.method = 'PUT'

        body = {"meta": {"key1": "value1"}}

        req.body = jsonutils.dumps(body)

        req.headers["content-type"] = "application/json"

        self.assertRaises(webob.exc.HTTPBadRequest,

                          self.controller.update, req, self.req_id, 'bad',

                          body)

**** CubicPower OpenStack Study ****

    def test_invalid_metadata_items_on_create(self):

        self.stubs.Set(db, 'volume_metadata_update',

                       return_create_volume_metadata)

        req = fakes.HTTPRequest.blank(self.url)

        req.method = 'POST'

        req.headers["content-type"] = "application/json"

        #test for long key

        data = {"metadata": {"a" * 260: "value1"}}

        req.body = jsonutils.dumps(data)

        self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,

                          self.controller.create, req, self.req_id, data)

        #test for long value

        data = {"metadata": {"key": "v" * 260}}

        req.body = jsonutils.dumps(data)

        self.assertRaises(webob.exc.HTTPRequestEntityTooLarge,

                          self.controller.create, req, self.req_id, data)

        #test for empty key.

        data = {"metadata": {"": "value1"}}

        req.body = jsonutils.dumps(data)

        self.assertRaises(webob.exc.HTTPBadRequest,

                          self.controller.create, req, self.req_id, data)