¡@

Home 

OpenStack Study: stubs.py

OpenStack Index

**** CubicPower OpenStack Study ****

def stub_volume(id, **kwargs):

    volume = {

        'id': id,

        'user_id': 'fakeuser',

        'project_id': 'fakeproject',

        'host': 'fakehost',

        'size': 1,

        'availability_zone': 'fakeaz',

        'instance_uuid': 'fakeuuid',

        'attached_host': None,

        'mountpoint': '/',

        'status': 'fakestatus',

        'migration_status': None,

        'attach_status': 'attached',

        'bootable': 'false',

        'name': 'vol name',

        'display_name': 'displayname',

        'display_description': 'displaydesc',

        'created_at': datetime.datetime(1, 1, 1, 1, 1, 1),

        'snapshot_id': None,

        'source_volid': None,

        'volume_type_id': '3e196c20-3c06-11e2-81c1-0800200c9a66',

        'encryption_key_id': None,

        'volume_admin_metadata': [{'key': 'attached_mode', 'value': 'rw'},

                                  {'key': 'readonly', 'value': 'False'}],

        'bootable': False,

        'launched_at': datetime.datetime(1, 1, 1, 1, 1, 1),

        'volume_type': {'name': 'vol_type_name'}}

    volume.update(kwargs)

    if kwargs.get('volume_glance_metadata', None):

        volume['bootable'] = True

    if kwargs.get('attach_status') == 'detached':

        del volume['volume_admin_metadata'][0]

    return volume

**** CubicPower OpenStack Study ****

def stub_volume_create(self, context, size, name, description, snapshot,

                       **param):

    vol = stub_volume('1')

    vol['size'] = size

    vol['display_name'] = name

    vol['display_description'] = description

    vol['source_volid'] = None

    vol['bootable'] = False

    try:

        vol['snapshot_id'] = snapshot['id']

    except (KeyError, TypeError):

        vol['snapshot_id'] = None

    vol['availability_zone'] = param.get('availability_zone', 'fakeaz')

    return vol

**** CubicPower OpenStack Study ****

def stub_volume_create_from_image(self, context, size, name, description,

                                  snapshot, volume_type, metadata,

                                  availability_zone):

    vol = stub_volume('1')

    vol['status'] = 'creating'

    vol['size'] = size

    vol['display_name'] = name

    vol['display_description'] = description

    vol['availability_zone'] = 'cinder'

    vol['bootable'] = False

    return vol

**** CubicPower OpenStack Study ****

def stub_volume_update(self, context, *args, **param):

    pass

**** CubicPower OpenStack Study ****

def stub_volume_delete(self, context, *args, **param):

    pass

**** CubicPower OpenStack Study ****

def stub_volume_get(self, context, volume_id):

    return stub_volume(volume_id)

**** CubicPower OpenStack Study ****

def stub_volume_get_notfound(self, context, volume_id):

    raise exc.NotFound

**** CubicPower OpenStack Study ****

def stub_volume_get_db(context, volume_id):

    return stub_volume(volume_id)

**** CubicPower OpenStack Study ****

def stub_volume_get_all(context, search_opts=None, marker=None, limit=None,

                        sort_key='created_at', sort_dir='desc', filters=None):

    return [stub_volume(100, project_id='fake'),

            stub_volume(101, project_id='superfake'),

            stub_volume(102, project_id='superduperfake')]

**** CubicPower OpenStack Study ****

def stub_volume_get_all_by_project(self, context, marker, limit, sort_key,

                                   sort_dir, filters={}):

    return [stub_volume_get(self, context, '1')]

**** CubicPower OpenStack Study ****

def stub_snapshot(id, **kwargs):

    snapshot = {'id': id,

                'volume_id': 12,

                'status': 'available',

                'volume_size': 100,

                'created_at': None,

                'display_name': 'Default name',

                'display_description': 'Default description',

                'project_id': 'fake'}

    snapshot.update(kwargs)

    return snapshot

**** CubicPower OpenStack Study ****

def stub_snapshot_get_all(self):

    return [stub_snapshot(100, project_id='fake'),

            stub_snapshot(101, project_id='superfake'),

            stub_snapshot(102, project_id='superduperfake')]

**** CubicPower OpenStack Study ****

def stub_snapshot_get_all_by_project(self, context):

    return [stub_snapshot(1)]

**** CubicPower OpenStack Study ****

def stub_snapshot_update(self, context, *args, **param):

    pass

**** CubicPower OpenStack Study ****

def stub_service_get_all_by_topic(context, topic):

    return [{'availability_zone': "zone1:host1", "disabled": 0}]