¡@

Home 

OpenStack Study: test_volumes.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright 2013 Josh Durgin

# All Rights Reserved.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

import datetime

from lxml import etree

from oslo.config import cfg

import webob

from cinder.api import extensions

from cinder.api.v1 import volumes

from cinder import context

from cinder import db

from cinder import exception

from cinder import test

from cinder.tests.api import fakes

from cinder.tests.api.v2 import stubs

from cinder.tests import fake_notifier

from cinder.tests.image import fake as fake_image

from cinder.volume import api as volume_api

NS = '{http://docs.openstack.org/volume/api/v1}'

TEST_SNAPSHOT_UUID = '00000000-0000-0000-0000-000000000001'

CONF = cfg.CONF

**** CubicPower OpenStack Study ****

def stub_snapshot_get(self, context, snapshot_id):

    if snapshot_id != TEST_SNAPSHOT_UUID:

        raise exception.NotFound

    return {'id': snapshot_id,

            'volume_id': 12,

            'status': 'available',

            'volume_size': 100,

            'created_at': None,

            'display_name': 'Default name',

            'display_description': 'Default description', }

**** CubicPower OpenStack Study ****

class VolumeApiTest(test.TestCase):

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(VolumeApiTest, self).setUp()

        self.ext_mgr = extensions.ExtensionManager()

        self.ext_mgr.extensions = {}

        fake_image.stub_out_image_service(self.stubs)

        self.controller = volumes.VolumeController(self.ext_mgr)

        self.flags(host='fake',

                   notification_driver=[fake_notifier.__name__])

        self.stubs.Set(db, 'volume_get_all', stubs.stub_volume_get_all)

        self.stubs.Set(db, 'service_get_all_by_topic',

                       stubs.stub_service_get_all_by_topic)

        self.stubs.Set(volume_api.API, 'delete', stubs.stub_volume_delete)

**** CubicPower OpenStack Study ****

    def tearDown(self):

        super(VolumeApiTest, self).tearDown()

        fake_notifier.reset()

**** CubicPower OpenStack Study ****

    def test_volume_create(self):

        self.stubs.Set(volume_api.API, 'get', stubs.stub_volume_get)

        self.stubs.Set(volume_api.API, "create", stubs.stub_volume_create)

        vol = {"size": 100,

               "display_name": "Volume Test Name",

               "display_description": "Volume Test Desc",

               "availability_zone": "zone1:host1"}

        body = {"volume": vol}

        req = fakes.HTTPRequest.blank('/v1/volumes')

        res_dict = self.controller.create(req, body)

        expected = {'volume': {'status': 'fakestatus',

                               'display_description': 'Volume Test Desc',

                               'availability_zone': 'zone1:host1',

                               'display_name': 'Volume Test Name',

                               'encrypted': False,

                               'attachments': [{'device': '/',

                                                'server_id': 'fakeuuid',

                                                'host_name': None,

                                                'id': '1',

                                                'volume_id': '1'}],

                               'bootable': 'false',

                               'volume_type': 'vol_type_name',

                               'snapshot_id': None,

                               'source_volid': None,

                               'metadata': {'attached_mode': 'rw',

                                            'readonly': 'False'},

                               'id': '1',

                               'created_at': datetime.datetime(1, 1, 1,

                                                               1, 1, 1),

                               'size': 100,

                               'encrypted': False}}

        self.assertEqual(res_dict, expected)

**** CubicPower OpenStack Study ****

    def test_volume_create_with_type(self):

        vol_type = CONF.default_volume_type

        db.volume_type_create(context.get_admin_context(),

                              dict(name=vol_type, extra_specs={}))

        db_vol_type = db.volume_type_get_by_name(context.get_admin_context(),

                                                 vol_type)

        vol = {"size": 100,

               "display_name": "Volume Test Name",

               "display_description": "Volume Test Desc",

               "availability_zone": "zone1:host1",

               "volume_type": "FakeTypeName"}

        body = {"volume": vol}

        req = fakes.HTTPRequest.blank('/v1/volumes')

        # Raise 404 when type name isn't valid

        self.assertRaises(webob.exc.HTTPNotFound, self.controller.create,

                          req, body)

        # Use correct volume type name

        vol.update(dict(volume_type=CONF.default_volume_type))

        body.update(dict(volume=vol))

        res_dict = self.controller.create(req, body)

        volume_id = res_dict['volume']['id']

        self.assertEqual(len(res_dict), 1)

        self.assertEqual(res_dict['volume']['volume_type'],

                         db_vol_type['name'])

        # Use correct volume type id

        vol.update(dict(volume_type=db_vol_type['id']))

        body.update(dict(volume=vol))

        res_dict = self.controller.create(req, body)

        volume_id = res_dict['volume']['id']

        self.assertEqual(len(res_dict), 1)

        self.assertEqual(res_dict['volume']['volume_type'],

                         db_vol_type['name'])

**** CubicPower OpenStack Study ****

    def test_volume_creation_fails_with_bad_size(self):

        vol = {"size": '',

               "display_name": "Volume Test Name",

               "display_description": "Volume Test Desc",

               "availability_zone": "zone1:host1"}

        body = {"volume": vol}

        req = fakes.HTTPRequest.blank('/v1/volumes')

        self.assertRaises(exception.InvalidInput,

                          self.controller.create,

                          req,

                          body)

**** CubicPower OpenStack Study ****

    def test_volume_creation_fails_with_bad_availability_zone(self):

        vol = {"size": '1',

               "name": "Volume Test Name",

               "description": "Volume Test Desc",

               "availability_zone": "zonen:hostn"}

        body = {"volume": vol}

        req = fakes.HTTPRequest.blank('/v2/volumes')

        self.assertRaises(exception.InvalidInput,

                          self.controller.create,

                          req, body)

**** CubicPower OpenStack Study ****

    def test_volume_create_with_image_id(self):

        self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)

        self.stubs.Set(volume_api.API, "create", stubs.stub_volume_create)

        self.ext_mgr.extensions = {'os-image-create': 'fake'}

        test_id = "c905cedb-7281-47e4-8a62-f26bc5fc4c77"

        vol = {"size": '1',

               "display_name": "Volume Test Name",

               "display_description": "Volume Test Desc",

               "availability_zone": "nova",

               "imageRef": test_id}

        expected = {'volume': {'status': 'fakestatus',

                               'display_description': 'Volume Test Desc',

                               'availability_zone': 'nova',

                               'display_name': 'Volume Test Name',

                               'encrypted': False,

                               'attachments': [{'device': '/',

                                                'server_id': 'fakeuuid',

                                                'host_name': None,

                                                'id': '1',

                                                'volume_id': '1'}],

                               'bootable': 'false',

                               'volume_type': 'vol_type_name',

                               'image_id': test_id,

                               'snapshot_id': None,

                               'source_volid': None,

                               'metadata': {'attached_mode': 'rw',

                                            'readonly': 'False'},

                               'id': '1',

                               'created_at': datetime.datetime(1, 1, 1,

                                                               1, 1, 1),

                               'size': '1'}}

        body = {"volume": vol}

        req = fakes.HTTPRequest.blank('/v1/volumes')

        res_dict = self.controller.create(req, body)

        self.assertEqual(res_dict, expected)

**** CubicPower OpenStack Study ****

    def test_volume_create_with_image_id_is_integer(self):

        self.stubs.Set(volume_api.API, "create", stubs.stub_volume_create)

        self.ext_mgr.extensions = {'os-image-create': 'fake'}

        vol = {"size": '1',

               "display_name": "Volume Test Name",

               "display_description": "Volume Test Desc",

               "availability_zone": "cinder",

               "imageRef": 1234}

        body = {"volume": vol}

        req = fakes.HTTPRequest.blank('/v1/volumes')

        self.assertRaises(webob.exc.HTTPBadRequest,

                          self.controller.create,

                          req,

                          body)

**** CubicPower OpenStack Study ****

    def test_volume_create_with_image_id_not_uuid_format(self):

        self.stubs.Set(volume_api.API, "create", stubs.stub_volume_create)

        self.ext_mgr.extensions = {'os-image-create': 'fake'}

        vol = {"size": '1',

               "display_name": "Volume Test Name",

               "display_description": "Volume Test Desc",

               "availability_zone": "cinder",

               "imageRef": '12345'}

        body = {"volume": vol}

        req = fakes.HTTPRequest.blank('/v1/volumes')

        self.assertRaises(webob.exc.HTTPBadRequest,

                          self.controller.create,

                          req,

                          body)

**** CubicPower OpenStack Study ****

    def test_volume_update(self):

        self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)

        self.stubs.Set(volume_api.API, "update", stubs.stub_volume_update)

        updates = {

            "display_name": "Updated Test Name",

        }

        body = {"volume": updates}

        req = fakes.HTTPRequest.blank('/v1/volumes/1')

        self.assertEqual(len(fake_notifier.NOTIFICATIONS), 0)

        res_dict = self.controller.update(req, '1', body)

        expected = {'volume': {

            'status': 'fakestatus',

            'display_description': 'displaydesc',

            'availability_zone': 'fakeaz',

            'display_name': 'Updated Test Name',

            'encrypted': False,

            'attachments': [{

                'id': '1',

                'volume_id': '1',

                'server_id': 'fakeuuid',

                'host_name': None,

                'device': '/'

            }],

            'bootable': 'false',

            'volume_type': 'vol_type_name',

            'snapshot_id': None,

            'source_volid': None,

            'metadata': {'attached_mode': 'rw',

                         'readonly': 'False'},

            'id': '1',

            'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),

            'size': 1}}

        self.assertEqual(res_dict, expected)

        self.assertEqual(len(fake_notifier.NOTIFICATIONS), 2)

**** CubicPower OpenStack Study ****

    def test_volume_update_metadata(self):

        self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)

        self.stubs.Set(volume_api.API, "update", stubs.stub_volume_update)

        updates = {

            "metadata": {"qos_max_iops": 2000}

        }

        body = {"volume": updates}

        req = fakes.HTTPRequest.blank('/v1/volumes/1')

        self.assertEqual(len(fake_notifier.NOTIFICATIONS), 0)

        res_dict = self.controller.update(req, '1', body)

        expected = {'volume': {

            'status': 'fakestatus',

            'display_description': 'displaydesc',

            'availability_zone': 'fakeaz',

            'display_name': 'displayname',

            'encrypted': False,

            'attachments': [{

                'id': '1',

                'volume_id': '1',

                'server_id': 'fakeuuid',

                'host_name': None,

                'device': '/'

            }],

            'bootable': 'false',

            'volume_type': 'vol_type_name',

            'snapshot_id': None,

            'source_volid': None,

            'metadata': {"qos_max_iops": 2000,

                         "readonly": "False",

                         "attached_mode": "rw"},

            'id': '1',

            'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),

            'size': 1

        }}

        self.assertEqual(res_dict, expected)

        self.assertEqual(len(fake_notifier.NOTIFICATIONS), 2)

**** CubicPower OpenStack Study ****

    def test_volume_update_with_admin_metadata(self):

        self.stubs.Set(volume_api.API, "update", stubs.stub_volume_update)

        volume = stubs.stub_volume("1")

        del volume['name']

        del volume['volume_type']

        del volume['volume_type_id']

        volume['metadata'] = {'key': 'value'}

        db.volume_create(context.get_admin_context(), volume)

        db.volume_admin_metadata_update(context.get_admin_context(), "1",

                                        {"readonly": "True",

                                         "invisible_key": "invisible_value"},

                                        False)

        updates = {

            "display_name": "Updated Test Name",

        }

        body = {"volume": updates}

        req = fakes.HTTPRequest.blank('/v1/volumes/1')

        self.assertEqual(len(fake_notifier.NOTIFICATIONS), 0)

        admin_ctx = context.RequestContext('admin', 'fakeproject', True)

        req.environ['cinder.context'] = admin_ctx

        res_dict = self.controller.update(req, '1', body)

        expected = {'volume': {

            'status': 'fakestatus',

            'display_description': 'displaydesc',

            'availability_zone': 'fakeaz',

            'display_name': 'Updated Test Name',

            'encrypted': False,

            'attachments': [{

                'id': '1',

                'volume_id': '1',

                'server_id': 'fakeuuid',

                'host_name': None,

                'device': '/'

            }],

            'bootable': 'false',

            'volume_type': 'None',

            'snapshot_id': None,

            'source_volid': None,

            'metadata': {'key': 'value',

                         'readonly': 'True'},

            'id': '1',

            'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),

            'size': 1}}

        self.assertEqual(res_dict, expected)

        self.assertEqual(len(fake_notifier.NOTIFICATIONS), 2)

**** CubicPower OpenStack Study ****

    def test_update_empty_body(self):

        body = {}

        req = fakes.HTTPRequest.blank('/v1/volumes/1')

        self.assertRaises(webob.exc.HTTPUnprocessableEntity,

                          self.controller.update,

                          req, '1', body)

**** CubicPower OpenStack Study ****

    def test_update_invalid_body(self):

        body = {'display_name': 'missing top level volume key'}

        req = fakes.HTTPRequest.blank('/v1/volumes/1')

        self.assertRaises(webob.exc.HTTPUnprocessableEntity,

                          self.controller.update,

                          req, '1', body)

**** CubicPower OpenStack Study ****

    def test_update_not_found(self):

        self.stubs.Set(volume_api.API, "get", stubs.stub_volume_get_notfound)

        updates = {

            "display_name": "Updated Test Name",

        }

        body = {"volume": updates}

        req = fakes.HTTPRequest.blank('/v1/volumes/1')

        self.assertRaises(webob.exc.HTTPNotFound,

                          self.controller.update,

                          req, '1', body)

**** CubicPower OpenStack Study ****

    def test_volume_list(self):

        self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)

        self.stubs.Set(volume_api.API, 'get_all',

                       stubs.stub_volume_get_all_by_project)

        req = fakes.HTTPRequest.blank('/v1/volumes')

        res_dict = self.controller.index(req)

        expected = {'volumes': [{'status': 'fakestatus',

                                 'display_description': 'displaydesc',

                                 'availability_zone': 'fakeaz',

                                 'display_name': 'displayname',

                                 'encrypted': False,

                                 'attachments': [{'device': '/',

                                                  'server_id': 'fakeuuid',

                                                  'host_name': None,

                                                  'id': '1',

                                                  'volume_id': '1'}],

                                 'bootable': 'false',

                                 'volume_type': 'vol_type_name',

                                 'snapshot_id': None,

                                 'source_volid': None,

                                 'metadata': {'attached_mode': 'rw',

                                              'readonly': 'False'},

                                 'id': '1',

                                 'created_at': datetime.datetime(1, 1, 1,

                                                                 1, 1, 1),

                                 'size': 1}]}

        self.assertEqual(res_dict, expected)

        # Finally test that we cached the returned volumes

        self.assertEqual(1, len(req.cached_resource()))

**** CubicPower OpenStack Study ****

    def test_volume_list_with_admin_metadata(self):

        volume = stubs.stub_volume("1")

        del volume['name']

        del volume['volume_type']

        del volume['volume_type_id']

        volume['metadata'] = {'key': 'value'}

        db.volume_create(context.get_admin_context(), volume)

        db.volume_admin_metadata_update(context.get_admin_context(), "1",

                                        {"readonly": "True",

                                         "invisible_key": "invisible_value"},

                                        False)

        req = fakes.HTTPRequest.blank('/v1/volumes')

        admin_ctx = context.RequestContext('admin', 'fakeproject', True)

        req.environ['cinder.context'] = admin_ctx

        res_dict = self.controller.index(req)

        expected = {'volumes': [{'status': 'fakestatus',

                                 'display_description': 'displaydesc',

                                 'availability_zone': 'fakeaz',

                                 'display_name': 'displayname',

                                 'encrypted': False,

                                 'attachments': [{'device': '/',

                                                  'server_id': 'fakeuuid',

                                                  'host_name': None,

                                                  'id': '1',

                                                  'volume_id': '1'}],

                                 'bootable': 'false',

                                 'volume_type': 'None',

                                 'snapshot_id': None,

                                 'source_volid': None,

                                 'metadata': {'key': 'value',

                                              'readonly': 'True'},

                                 'id': '1',

                                 'created_at': datetime.datetime(1, 1, 1,

                                                                 1, 1, 1),

                                 'size': 1}]}

        self.assertEqual(res_dict, expected)

**** CubicPower OpenStack Study ****

    def test_volume_list_detail(self):

        self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)

        self.stubs.Set(volume_api.API, 'get_all',

                       stubs.stub_volume_get_all_by_project)

        req = fakes.HTTPRequest.blank('/v1/volumes/detail')

        res_dict = self.controller.index(req)

        expected = {'volumes': [{'status': 'fakestatus',

                                 'display_description': 'displaydesc',

                                 'availability_zone': 'fakeaz',

                                 'display_name': 'displayname',

                                 'encrypted': False,

                                 'attachments': [{'device': '/',

                                                  'server_id': 'fakeuuid',

                                                  'host_name': None,

                                                  'id': '1',

                                                  'volume_id': '1'}],

                                 'bootable': 'false',

                                 'volume_type': 'vol_type_name',

                                 'snapshot_id': None,

                                 'source_volid': None,

                                 'metadata': {'attached_mode': 'rw',

                                              'readonly': 'False'},

                                 'id': '1',

                                 'created_at': datetime.datetime(1, 1, 1,

                                                                 1, 1, 1),

                                 'size': 1}]}

        self.assertEqual(res_dict, expected)

        # Finally test that we cached the returned volumes

        self.assertEqual(1, len(req.cached_resource()))

**** CubicPower OpenStack Study ****

    def test_volume_list_detail_with_admin_metadata(self):

        volume = stubs.stub_volume("1")

        del volume['name']

        del volume['volume_type']

        del volume['volume_type_id']

        volume['metadata'] = {'key': 'value'}

        db.volume_create(context.get_admin_context(), volume)

        db.volume_admin_metadata_update(context.get_admin_context(), "1",

                                        {"readonly": "True",

                                         "invisible_key": "invisible_value"},

                                        False)

        req = fakes.HTTPRequest.blank('/v1/volumes/detail')

        admin_ctx = context.RequestContext('admin', 'fakeproject', True)

        req.environ['cinder.context'] = admin_ctx

        res_dict = self.controller.index(req)

        expected = {'volumes': [{'status': 'fakestatus',

                                 'display_description': 'displaydesc',

                                 'availability_zone': 'fakeaz',

                                 'display_name': 'displayname',

                                 'encrypted': False,

                                 'attachments': [{'device': '/',

                                                  'server_id': 'fakeuuid',

                                                  'host_name': None,

                                                  'id': '1',

                                                  'volume_id': '1'}],

                                 'bootable': 'false',

                                 'volume_type': 'None',

                                 'snapshot_id': None,

                                 'source_volid': None,

                                 'metadata': {'key': 'value',

                                              'readonly': 'True'},

                                 'id': '1',

                                 'created_at': datetime.datetime(1, 1, 1,

                                                                 1, 1, 1),

                                 'size': 1}]}

        self.assertEqual(res_dict, expected)

**** CubicPower OpenStack Study ****

    def test_volume_show(self):

        self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)

        req = fakes.HTTPRequest.blank('/v1/volumes/1')

        res_dict = self.controller.show(req, '1')

        expected = {'volume': {'status': 'fakestatus',

                               'display_description': 'displaydesc',

                               'availability_zone': 'fakeaz',

                               'display_name': 'displayname',

                               'encrypted': False,

                               'attachments': [{'device': '/',

                                                'server_id': 'fakeuuid',

                                                'host_name': None,

                                                'id': '1',

                                                'volume_id': '1'}],

                               'bootable': 'false',

                               'volume_type': 'vol_type_name',

                               'snapshot_id': None,

                               'source_volid': None,

                               'metadata': {'attached_mode': 'rw',

                                            'readonly': 'False'},

                               'id': '1',

                               'created_at': datetime.datetime(1, 1, 1,

                                                               1, 1, 1),

                               'size': 1}}

        self.assertEqual(res_dict, expected)

        # Finally test that we cached the returned volume

        self.assertIsNotNone(req.cached_resource_by_id('1'))

**** CubicPower OpenStack Study ****

    def test_volume_show_no_attachments(self):

        def stub_volume_get(self, context, volume_id):

            return stubs.stub_volume(volume_id, attach_status='detached')

        self.stubs.Set(volume_api.API, 'get', stub_volume_get)

        req = fakes.HTTPRequest.blank('/v1/volumes/1')

        res_dict = self.controller.show(req, '1')

        expected = {'volume': {'status': 'fakestatus',

                               'display_description': 'displaydesc',

                               'availability_zone': 'fakeaz',

                               'display_name': 'displayname',

                               'encrypted': False,

                               'attachments': [],

                               'bootable': 'false',

                               'volume_type': 'vol_type_name',

                               'snapshot_id': None,

                               'source_volid': None,

                               'metadata': {'readonly': 'False'},

                               'id': '1',

                               'created_at': datetime.datetime(1, 1, 1,

                                                               1, 1, 1),

                               'size': 1}}

        self.assertEqual(res_dict, expected)

**** CubicPower OpenStack Study ****

        def stub_volume_get(self, context, volume_id):

            return stubs.stub_volume(volume_id, attach_status='detached')

        self.stubs.Set(volume_api.API, 'get', stub_volume_get)

        req = fakes.HTTPRequest.blank('/v1/volumes/1')

        res_dict = self.controller.show(req, '1')

        expected = {'volume': {'status': 'fakestatus',

                               'display_description': 'displaydesc',

                               'availability_zone': 'fakeaz',

                               'display_name': 'displayname',

                               'encrypted': False,

                               'attachments': [],

                               'bootable': 'false',

                               'volume_type': 'vol_type_name',

                               'snapshot_id': None,

                               'source_volid': None,

                               'metadata': {'readonly': 'False'},

                               'id': '1',

                               'created_at': datetime.datetime(1, 1, 1,

                                                               1, 1, 1),

                               'size': 1}}

        self.assertEqual(res_dict, expected)

**** CubicPower OpenStack Study ****

    def test_volume_show_bootable(self):

        def stub_volume_get(self, context, volume_id):

            return (stubs.stub_volume(volume_id,

                    volume_glance_metadata=dict(foo='bar')))

        self.stubs.Set(volume_api.API, 'get', stub_volume_get)

        req = fakes.HTTPRequest.blank('/v1/volumes/1')

        res_dict = self.controller.show(req, '1')

        expected = {'volume': {'status': 'fakestatus',

                               'display_description': 'displaydesc',

                               'availability_zone': 'fakeaz',

                               'display_name': 'displayname',

                               'encrypted': False,

                               'attachments': [{'device': '/',

                                                'server_id': 'fakeuuid',

                                                'host_name': None,

                                                'id': '1',

                                                'volume_id': '1'}],

                               'bootable': 'true',

                               'volume_type': 'vol_type_name',

                               'snapshot_id': None,

                               'source_volid': None,

                               'metadata': {'attached_mode': 'rw',

                                            'readonly': 'False'},

                               'id': '1',

                               'created_at': datetime.datetime(1, 1, 1,

                                                               1, 1, 1),

                               'size': 1}}

        self.assertEqual(res_dict, expected)

**** CubicPower OpenStack Study ****

        def stub_volume_get(self, context, volume_id):

            return (stubs.stub_volume(volume_id,

                    volume_glance_metadata=dict(foo='bar')))

        self.stubs.Set(volume_api.API, 'get', stub_volume_get)

        req = fakes.HTTPRequest.blank('/v1/volumes/1')

        res_dict = self.controller.show(req, '1')

        expected = {'volume': {'status': 'fakestatus',

                               'display_description': 'displaydesc',

                               'availability_zone': 'fakeaz',

                               'display_name': 'displayname',

                               'encrypted': False,

                               'attachments': [{'device': '/',

                                                'server_id': 'fakeuuid',

                                                'host_name': None,

                                                'id': '1',

                                                'volume_id': '1'}],

                               'bootable': 'true',

                               'volume_type': 'vol_type_name',

                               'snapshot_id': None,

                               'source_volid': None,

                               'metadata': {'attached_mode': 'rw',

                                            'readonly': 'False'},

                               'id': '1',

                               'created_at': datetime.datetime(1, 1, 1,

                                                               1, 1, 1),

                               'size': 1}}

        self.assertEqual(res_dict, expected)

**** CubicPower OpenStack Study ****

    def test_volume_show_no_volume(self):

        self.stubs.Set(volume_api.API, "get", stubs.stub_volume_get_notfound)

        req = fakes.HTTPRequest.blank('/v1/volumes/1')

        self.assertRaises(webob.exc.HTTPNotFound,

                          self.controller.show,

                          req,

                          1)

        # Finally test that we did not cache anything

        self.assertIsNone(req.cached_resource_by_id('1'))

**** CubicPower OpenStack Study ****

    def test_volume_detail_limit_offset(self):

        def volume_detail_limit_offset(is_admin):

            def stub_volume_get_all_by_project(context, project_id, marker,

                                               limit, sort_key, sort_dir,

                                               filters=None):

                return [

                    stubs.stub_volume(1, display_name='vol1'),

                    stubs.stub_volume(2, display_name='vol2'),

                ]

            self.stubs.Set(db, 'volume_get_all_by_project',

                           stub_volume_get_all_by_project)

            self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)

            req = fakes.HTTPRequest.blank('/v1/volumes/detail?limit=2\

                                          &offset=1',

                                          use_admin_context=is_admin)

            res_dict = self.controller.index(req)

            volumes = res_dict['volumes']

            self.assertEqual(len(volumes), 1)

            self.assertEqual(volumes[0]['id'], 2)

        #admin case

        volume_detail_limit_offset(is_admin=True)

        #non_admin case

        volume_detail_limit_offset(is_admin=False)

**** CubicPower OpenStack Study ****

        def volume_detail_limit_offset(is_admin):

            def stub_volume_get_all_by_project(context, project_id, marker,

                                               limit, sort_key, sort_dir,

                                               filters=None):

                return [

                    stubs.stub_volume(1, display_name='vol1'),

                    stubs.stub_volume(2, display_name='vol2'),

                ]

            self.stubs.Set(db, 'volume_get_all_by_project',

                           stub_volume_get_all_by_project)

            self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)

            req = fakes.HTTPRequest.blank('/v1/volumes/detail?limit=2\

                                          &offset=1',

                                          use_admin_context=is_admin)

            res_dict = self.controller.index(req)

            volumes = res_dict['volumes']

            self.assertEqual(len(volumes), 1)

            self.assertEqual(volumes[0]['id'], 2)

        #admin case

        volume_detail_limit_offset(is_admin=True)

        #non_admin case

        volume_detail_limit_offset(is_admin=False)

**** CubicPower OpenStack Study ****

    def test_volume_show_with_admin_metadata(self):

        volume = stubs.stub_volume("1")

        del volume['name']

        del volume['volume_type']

        del volume['volume_type_id']

        volume['metadata'] = {'key': 'value'}

        db.volume_create(context.get_admin_context(), volume)

        db.volume_admin_metadata_update(context.get_admin_context(), "1",

                                        {"readonly": "True",

                                         "invisible_key": "invisible_value"},

                                        False)

        req = fakes.HTTPRequest.blank('/v1/volumes/1')

        admin_ctx = context.RequestContext('admin', 'fakeproject', True)

        req.environ['cinder.context'] = admin_ctx

        res_dict = self.controller.show(req, '1')

        expected = {'volume': {'status': 'fakestatus',

                               'display_description': 'displaydesc',

                               'availability_zone': 'fakeaz',

                               'display_name': 'displayname',

                               'encrypted': False,

                               'attachments': [{'device': '/',

                                                'server_id': 'fakeuuid',

                                                'host_name': None,

                                                'id': '1',

                                                'volume_id': '1'}],

                               'bootable': 'false',

                               'volume_type': 'None',

                               'snapshot_id': None,

                               'source_volid': None,

                               'metadata': {'key': 'value',

                                            'readonly': 'True'},

                               'id': '1',

                               'created_at': datetime.datetime(1, 1, 1,

                                                               1, 1, 1),

                               'size': 1}}

        self.assertEqual(res_dict, expected)

**** CubicPower OpenStack Study ****

    def test_volume_show_with_encrypted_volume(self):

        def stub_volume_get(self, context, volume_id):

            return stubs.stub_volume(volume_id, encryption_key_id='fake_id')

        self.stubs.Set(volume_api.API, 'get', stub_volume_get)

        req = fakes.HTTPRequest.blank('/v1/volumes/1')

        res_dict = self.controller.show(req, 1)

        self.assertEqual(res_dict['volume']['encrypted'], True)

**** CubicPower OpenStack Study ****

        def stub_volume_get(self, context, volume_id):

            return stubs.stub_volume(volume_id, encryption_key_id='fake_id')

        self.stubs.Set(volume_api.API, 'get', stub_volume_get)

        req = fakes.HTTPRequest.blank('/v1/volumes/1')

        res_dict = self.controller.show(req, 1)

        self.assertEqual(res_dict['volume']['encrypted'], True)

**** CubicPower OpenStack Study ****

    def test_volume_show_with_unencrypted_volume(self):

        def stub_volume_get(self, context, volume_id):

            return stubs.stub_volume(volume_id, encryption_key_id=None)

        self.stubs.Set(volume_api.API, 'get', stub_volume_get)

        req = fakes.HTTPRequest.blank('/v1/volumes/1')

        res_dict = self.controller.show(req, 1)

        self.assertEqual(res_dict['volume']['encrypted'], False)

**** CubicPower OpenStack Study ****

        def stub_volume_get(self, context, volume_id):

            return stubs.stub_volume(volume_id, encryption_key_id=None)

        self.stubs.Set(volume_api.API, 'get', stub_volume_get)

        req = fakes.HTTPRequest.blank('/v1/volumes/1')

        res_dict = self.controller.show(req, 1)

        self.assertEqual(res_dict['volume']['encrypted'], False)

**** CubicPower OpenStack Study ****

    def test_volume_delete(self):

        self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)

        req = fakes.HTTPRequest.blank('/v1/volumes/1')

        resp = self.controller.delete(req, 1)

        self.assertEqual(resp.status_int, 202)

**** CubicPower OpenStack Study ****

    def test_volume_delete_no_volume(self):

        self.stubs.Set(volume_api.API, "get", stubs.stub_volume_get_notfound)

        req = fakes.HTTPRequest.blank('/v1/volumes/1')

        self.assertRaises(webob.exc.HTTPNotFound,

                          self.controller.delete,

                          req,

                          1)

**** CubicPower OpenStack Study ****

    def test_admin_list_volumes_limited_to_project(self):

        self.stubs.Set(db, 'volume_get_all_by_project',

                       stubs.stub_volume_get_all_by_project)

        req = fakes.HTTPRequest.blank('/v1/fake/volumes',

                                      use_admin_context=True)

        res = self.controller.index(req)

        self.assertIn('volumes', res)

        self.assertEqual(1, len(res['volumes']))

**** CubicPower OpenStack Study ****

    def test_admin_list_volumes_all_tenants(self):

        req = fakes.HTTPRequest.blank('/v1/fake/volumes?all_tenants=1',

                                      use_admin_context=True)

        res = self.controller.index(req)

        self.assertIn('volumes', res)

        self.assertEqual(3, len(res['volumes']))

**** CubicPower OpenStack Study ****

    def test_all_tenants_non_admin_gets_all_tenants(self):

        self.stubs.Set(db, 'volume_get_all_by_project',

                       stubs.stub_volume_get_all_by_project)

        self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)

        req = fakes.HTTPRequest.blank('/v1/fake/volumes?all_tenants=1')

        res = self.controller.index(req)

        self.assertIn('volumes', res)

        self.assertEqual(1, len(res['volumes']))

**** CubicPower OpenStack Study ****

    def test_non_admin_get_by_project(self):

        self.stubs.Set(db, 'volume_get_all_by_project',

                       stubs.stub_volume_get_all_by_project)

        self.stubs.Set(db, 'volume_get', stubs.stub_volume_get_db)

        req = fakes.HTTPRequest.blank('/v1/fake/volumes')

        res = self.controller.index(req)

        self.assertIn('volumes', res)

        self.assertEqual(1, len(res['volumes']))

**** CubicPower OpenStack Study ****

    def test_add_visible_admin_metadata_visible_key_only(self):

        admin_metadata = [{"key": "invisible_key", "value": "invisible_value"},

                          {"key": "readonly", "value": "visible"},

                          {"key": "attached_mode", "value": "visible"}]

        metadata = [{"key": "key", "value": "value"}]

        volume = dict(volume_admin_metadata=admin_metadata,

                      volume_metadata=metadata)

        admin_ctx = context.get_admin_context()

        self.controller._add_visible_admin_metadata(admin_ctx,

                                                    volume)

        self.assertEqual(volume['volume_metadata'],

                         [{"key": "key", "value": "value"},

                          {"key": "readonly", "value": "visible"},

                          {"key": "attached_mode", "value": "visible"}])

        admin_metadata = {"invisible_key": "invisible_value",

                          "readonly": "visible",

                          "attached_mode": "visible"}

        metadata = {"key": "value"}

        volume = dict(admin_metadata=admin_metadata,

                      metadata=metadata)

        admin_ctx = context.get_admin_context()

        self.controller._add_visible_admin_metadata(admin_ctx,

                                                    volume)

        self.assertEqual(volume['metadata'],

                         {'key': 'value',

                          'attached_mode': 'visible',

                          'readonly': 'visible'})

**** CubicPower OpenStack Study ****

class VolumeSerializerTest(test.TestCase):

**** CubicPower OpenStack Study ****

    def _verify_volume_attachment(self, attach, tree):

        for attr in ('id', 'volume_id', 'server_id', 'device'):

            self.assertEqual(str(attach[attr]), tree.get(attr))

**** CubicPower OpenStack Study ****

    def _verify_volume(self, vol, tree):

        self.assertEqual(tree.tag, NS + 'volume')

        for attr in ('id', 'status', 'size', 'availability_zone', 'created_at',

                     'display_name', 'display_description', 'volume_type',

                     'bootable', 'snapshot_id'):

            self.assertEqual(str(vol[attr]), tree.get(attr))

        for child in tree:

            self.assertIn(child.tag, (NS + 'attachments', NS + 'metadata'))

            if child.tag == 'attachments':

                self.assertEqual(1, len(child))

                self.assertEqual('attachment', child[0].tag)

                self._verify_volume_attachment(vol['attachments'][0], child[0])

            elif child.tag == 'metadata':

                not_seen = set(vol['metadata'].keys())

                for gr_child in child:

                    self.assertIn(gr_child.get("key"), not_seen)

                    self.assertEqual(str(vol['metadata'][gr_child.get("key")]),

                                     gr_child.text)

                    not_seen.remove(gr_child.get('key'))

                self.assertEqual(0, len(not_seen))

**** CubicPower OpenStack Study ****

    def test_volume_show_create_serializer(self):

        serializer = volumes.VolumeTemplate()

        raw_volume = dict(

            id='vol_id',

            status='vol_status',

            size=1024,

            availability_zone='vol_availability',

            bootable='false',

            created_at=datetime.datetime.now(),

            attachments=[dict(id='vol_id',

                              volume_id='vol_id',

                              server_id='instance_uuid',

                              device='/foo')],

            display_name='vol_name',

            display_description='vol_desc',

            volume_type='vol_type',

            snapshot_id='snap_id',

            source_volid='source_volid',

            metadata=dict(foo='bar',

                          baz='quux', ), )

        text = serializer.serialize(dict(volume=raw_volume))

        tree = etree.fromstring(text)

        self._verify_volume(raw_volume, tree)

**** CubicPower OpenStack Study ****

    def test_volume_index_detail_serializer(self):

        serializer = volumes.VolumesTemplate()

        raw_volumes = [dict(id='vol1_id',

                            status='vol1_status',

                            size=1024,

                            availability_zone='vol1_availability',

                            bootable='true',

                            created_at=datetime.datetime.now(),

                            attachments=[dict(id='vol1_id',

                                              volume_id='vol1_id',

                                              server_id='instance_uuid',

                                              device='/foo1')],

                            display_name='vol1_name',

                            display_description='vol1_desc',

                            volume_type='vol1_type',

                            snapshot_id='snap1_id',

                            source_volid=None,

                            metadata=dict(foo='vol1_foo',

                                          bar='vol1_bar', ), ),

                       dict(id='vol2_id',

                            status='vol2_status',

                            size=1024,

                            availability_zone='vol2_availability',

                            bootable='true',

                            created_at=datetime.datetime.now(),

                            attachments=[dict(id='vol2_id',

                                              volume_id='vol2_id',

                                              server_id='instance_uuid',

                                              device='/foo2')],

                            display_name='vol2_name',

                            display_description='vol2_desc',

                            volume_type='vol2_type',

                            snapshot_id='snap2_id',

                            source_volid=None,

                            metadata=dict(foo='vol2_foo',

                                          bar='vol2_bar', ), )]

        text = serializer.serialize(dict(volumes=raw_volumes))

        tree = etree.fromstring(text)

        self.assertEqual(NS + 'volumes', tree.tag)

        self.assertEqual(len(raw_volumes), len(tree))

        for idx, child in enumerate(tree):

            self._verify_volume(raw_volumes[idx], child)

**** CubicPower OpenStack Study ****

class TestVolumeCreateRequestXMLDeserializer(test.TestCase):

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(TestVolumeCreateRequestXMLDeserializer, self).setUp()

        self.deserializer = volumes.CreateDeserializer()

**** CubicPower OpenStack Study ****

    def test_minimal_volume(self):

        self_request = """

        size="1">"""

        request = self.deserializer.deserialize(self_request)

        expected = {"volume": {"size": "1", }, }

        self.assertEqual(request['body'], expected)

**** CubicPower OpenStack Study ****

    def test_display_name(self):

        self_request = """

        size="1"

        display_name="Volume-xml">"""

        request = self.deserializer.deserialize(self_request)

        expected = {

            "volume": {

                "size": "1",

                "display_name": "Volume-xml",

            },

        }

        self.assertEqual(request['body'], expected)

**** CubicPower OpenStack Study ****

    def test_display_description(self):

        self_request = """

        size="1"

        display_name="Volume-xml"

        display_description="description">"""

        request = self.deserializer.deserialize(self_request)

        expected = {

            "volume": {

                "size": "1",

                "display_name": "Volume-xml",

                "display_description": "description",

            },

        }

        self.assertEqual(request['body'], expected)

**** CubicPower OpenStack Study ****

    def test_volume_type(self):

        self_request = """

        size="1"

        display_name="Volume-xml"

        display_description="description"

        volume_type="289da7f8-6440-407c-9fb4-7db01ec49164">"""

        request = self.deserializer.deserialize(self_request)

        expected = {

            "volume": {

                "display_name": "Volume-xml",

                "size": "1",

                "display_name": "Volume-xml",

                "display_description": "description",

                "volume_type": "289da7f8-6440-407c-9fb4-7db01ec49164",

            },

        }

        self.assertEqual(request['body'], expected)

**** CubicPower OpenStack Study ****

    def test_availability_zone(self):

        self_request = """

        size="1"

        display_name="Volume-xml"

        display_description="description"

        volume_type="289da7f8-6440-407c-9fb4-7db01ec49164"

        availability_zone="us-east1">"""

        request = self.deserializer.deserialize(self_request)

        expected = {

            "volume": {

                "size": "1",

                "display_name": "Volume-xml",

                "display_description": "description",

                "volume_type": "289da7f8-6440-407c-9fb4-7db01ec49164",

                "availability_zone": "us-east1",

            },

        }

        self.assertEqual(request['body'], expected)

**** CubicPower OpenStack Study ****

    def test_metadata(self):

        self_request = """

        display_name="Volume-xml"

        size="1">

        work"""

        request = self.deserializer.deserialize(self_request)

        expected = {

            "volume": {

                "display_name": "Volume-xml",

                "size": "1",

                "metadata": {

                    "Type": "work",

                },

            },

        }

        self.assertEqual(request['body'], expected)

**** CubicPower OpenStack Study ****

    def test_full_volume(self):

        self_request = """

        size="1"

        display_name="Volume-xml"

        display_description="description"

        volume_type="289da7f8-6440-407c-9fb4-7db01ec49164"

        availability_zone="us-east1">

        work"""

        request = self.deserializer.deserialize(self_request)

        expected = {

            "volume": {

                "size": "1",

                "display_name": "Volume-xml",

                "display_description": "description",

                "volume_type": "289da7f8-6440-407c-9fb4-7db01ec49164",

                "availability_zone": "us-east1",

                "metadata": {

                    "Type": "work",

                },

            },

        }

        self.assertEqual(request['body'], expected)

**** CubicPower OpenStack Study ****

    def test_imageref(self):

        self_request = """

        size="1"

        display_name="Volume-xml"

        display_description="description"

        imageRef="4a90189d-d702-4c7c-87fc-6608c554d737">"""

        request = self.deserializer.deserialize(self_request)

        expected = {

            "volume": {

                "size": "1",

                "display_name": "Volume-xml",

                "display_description": "description",

                "imageRef": "4a90189d-d702-4c7c-87fc-6608c554d737",

            },

        }

        self.assertEqual(expected, request['body'])

**** CubicPower OpenStack Study ****

    def test_snapshot_id(self):

        self_request = """

        size="1"

        display_name="Volume-xml"

        display_description="description"

        snapshot_id="4a90189d-d702-4c7c-87fc-6608c554d737">"""

        request = self.deserializer.deserialize(self_request)

        expected = {

            "volume": {

                "size": "1",

                "display_name": "Volume-xml",

                "display_description": "description",

                "snapshot_id": "4a90189d-d702-4c7c-87fc-6608c554d737",

            },

        }

        self.assertEqual(expected, request['body'])

**** CubicPower OpenStack Study ****

    def test_source_volid(self):

        self_request = """

        size="1"

        display_name="Volume-xml"

        display_description="description"

        source_volid="4a90189d-d702-4c7c-87fc-6608c554d737">"""

        request = self.deserializer.deserialize(self_request)

        expected = {

            "volume": {

                "size": "1",

                "display_name": "Volume-xml",

                "display_description": "description",

                "source_volid": "4a90189d-d702-4c7c-87fc-6608c554d737",

            },

        }

        self.assertEqual(expected, request['body'])

**** CubicPower OpenStack Study ****

class VolumesUnprocessableEntityTestCase(test.TestCase):

"""Tests of places we throw 422 Unprocessable Entity from."""

**** CubicPower OpenStack Study ****

    def setUp(self):

        super(VolumesUnprocessableEntityTestCase, self).setUp()

        self.ext_mgr = extensions.ExtensionManager()

        self.ext_mgr.extensions = {}

        self.controller = volumes.VolumeController(self.ext_mgr)

**** CubicPower OpenStack Study ****

    def _unprocessable_volume_create(self, body):

        req = fakes.HTTPRequest.blank('/v2/fake/volumes')

        req.method = 'POST'

        self.assertRaises(webob.exc.HTTPUnprocessableEntity,

                          self.controller.create, req, body)

**** CubicPower OpenStack Study ****

    def test_create_no_body(self):

        self._unprocessable_volume_create(body=None)

**** CubicPower OpenStack Study ****

    def test_create_missing_volume(self):

        body = {'foo': {'a': 'b'}}

        self._unprocessable_volume_create(body=body)

**** CubicPower OpenStack Study ****

    def test_create_malformed_entity(self):

        body = {'volume': 'string'}

        self._unprocessable_volume_create(body=body)