@

Home 

OpenStack Study: cinder

OpenStack Index

Previous

Next

        def fake_reserve(context, expire=None, project_id=None, **deltas):

        def fake_commit_and_rollback(context, reservations, project_id=None):

    def test_create_delete_volume(self):

        def fake_reserve(context, expire=None, project_id=None, **deltas):

        def fake_commit(context, reservations, project_id=None):

        def fake_rollback(context, reservations, project_id=None):

    def test_create_delete_volume_with_metadata(self):

    def test_create_volume_with_invalid_metadata(self):

    def test_create_volume_uses_default_availability_zone(self):

        def fake_list_availability_zones():

    def test_create_volume_with_volume_type(self):

        def fake_reserve(context, expire=None, project_id=None, **deltas):

        def fake_commit(context, reservations, project_id=None):

        def fake_rollback(context, reservations, project_id=None):

    def test_create_volume_with_encrypted_volume_type(self):

    def test_create_delete_volume_with_encrypted_volume_type(self):

    def test_extra_capabilities(self):

    def test_extra_capabilities_fail(self):

    def test_delete_busy_volume(self):

    def test_delete_volume_in_error_extending(self):

    def test_create_volume_from_snapshot(self):

    def test_create_volume_from_snapshot_with_types(self, _get_flow):

    def test_create_volume_from_source_with_types(self, _get_flow):

    def test_create_snapshot_driver_not_initialized(self):

    def _mock_synchronized(self, name, *s_args, **s_kwargs):

        def inner_sync1(f):

    def test_create_volume_from_snapshot_check_locks(self):

        def mock_flow_run(*args, **kwargs):

    def test_create_volume_from_volume_check_locks(self):

        def mock_flow_run(*args, **kwargs):

    def test_create_volume_from_volume_delete_lock_taken(self):

        def mock_elevated(*args, **kwargs):

    def test_create_volume_from_snapshot_delete_lock_taken(self):

        def mock_elevated(*args, **kwargs):

    def test_create_volume_from_snapshot_with_encryption(self):

    def test_create_volume_from_encrypted_volume(self):

    def test_create_volume_from_snapshot_fail_bad_size(self):

    def test_create_volume_from_snapshot_fail_wrong_az(self):

        def fake_list_availability_zones():

    def test_create_volume_with_invalid_exclusive_options(self):

    def test_initialize_connection_fetchqos(self, _mock_volume_update, _mock_volume_get, _mock_volume_admin_metadata_get):

    def test_run_attach_detach_volume_for_instance(self):

    def test_run_attach_detach_volume_for_host(self):

    def test_run_attach_detach_volume_with_attach_mode(self):

    def test_run_manager_attach_detach_volume_with_wrong_attach_mode(self):

    def test_run_api_attach_detach_volume_with_wrong_attach_mode(self):

    def test_reserve_volume_success(self, volume_get, volume_update):

    def test_reserve_volume_bad_status(self):

    def test_unreserve_volume_success(self):

    def test_concurrent_volumes_get_different_targets(self):

        def _check(volume_id):

    def test_multi_node(self):

    def _create_snapshot(volume_id, size='0', metadata=None):

    def test_create_delete_snapshot(self):

    def test_create_delete_snapshot_with_metadata(self):

    def test_cannot_delete_volume_in_use(self):

    def test_force_delete_volume(self):

    def test_cannot_force_delete_attached_volume(self):

    def test_cannot_delete_volume_with_snapshots(self):

    def test_can_delete_errored_snapshot(self):

    def test_create_snapshot_force(self):

    def test_delete_busy_snapshot(self):

    def test_delete_no_dev_fails(self):

    def _create_volume_from_image(self, fakeout_copy_image_to_volume=False, fakeout_clone_image=False):

        def fake_local_path(volume):

        def fake_copy_image_to_volume(context, volume, image_service, image_id):

        def fake_fetch_to_raw(ctx, image_service, image_id, path, blocksize, size=None):

        def fake_clone_image(volume_ref, image_location, image_id, image_meta):

    def test_create_volume_from_image_cloned_status_available(self):

    def test_create_volume_from_image_not_cloned_status_available(self):

    def test_create_volume_from_image_exception(self):

    def test_create_volume_from_exact_sized_image(self):

    def test_create_volume_from_oversized_image(self):

    def test_create_volume_with_mindisk_error(self):

    def _do_test_create_volume_with_size(self, size):

        def fake_reserve(context, expire=None, project_id=None, **deltas):

        def fake_commit(context, reservations, project_id=None):

        def fake_rollback(context, reservations, project_id=None):

    def test_create_volume_int_size(self):

    def test_create_volume_string_size(self):

    def test_create_volume_with_bad_size(self):

        def fake_reserve(context, expire=None, project_id=None, **deltas):

        def fake_commit(context, reservations, project_id=None):

        def fake_rollback(context, reservations, project_id=None):

    def test_begin_roll_detaching_volume(self):

    def test_volume_api_update(self):

    def test_volume_api_update_snapshot(self):

    def test_extend_volume(self, reserve):

    def test_extend_volume_driver_not_initialized(self):

    def test_extend_volume_manager(self):

        def fake_extend(volume, new_size):

    def test_create_volume_from_unelevated_context(self):

        def fake_create_volume(*args, **kwargs):

    def test_create_volume_from_sourcevol(self):

        def fake_create_cloned_volume(volume, src_vref):

    def test_create_volume_from_sourcevol_fail_wrong_az(self):

        def fake_list_availability_zones():

    def test_create_volume_from_sourcevol_with_glance_metadata(self):

        def fake_create_cloned_volume(volume, src_vref):

    def test_create_volume_from_sourcevol_failed_clone(self):

        def fake_error_create_cloned_volume(volume, src_vref):

    def test_list_availability_zones_enabled_service(self):

        def stub_service_get_all_by_topic(*args, **kwargs):

    def test_migrate_volume_driver(self):

    def test_migrate_volume_generic(self):

        def fake_migr(vol, host):

        def fake_delete_volume_rpc(self, ctxt, vol_id):

        def fake_create_volume(self, ctxt, volume, host, req_spec, filters, allow_reschedule=True):

    def _retype_volume_exec(self, driver, snap=False, policy='on-demand', migrate_exc=False, exc=None, diff_equal=False):

    def test_retype_volume_driver_success(self):

    def test_retype_volume_migration_bad_policy(self):

    def test_retype_volume_migration_with_snaps(self):

    def test_retype_volume_migration_failed(self):

    def test_retype_volume_migration_success(self):

    def test_retype_volume_migration_equal_types(self):

    def test_migrate_driver_not_initialized(self):

    def test_update_volume_readonly_flag(self):

class CopyVolumeToImageTestCase(BaseVolumeTestCase):

    def fake_local_path(self, volume):

    def setUp(self):

    def tearDown(self):

    def test_copy_volume_to_image_status_available(self):

    def test_copy_volume_to_image_status_use(self):

    def test_copy_volume_to_image_exception(self):

class GetActiveByWindowTestCase(BaseVolumeTestCase):

    def setUp(self):

    def test_volume_get_active_by_window(self):

    def test_snapshot_get_active_by_window(self):

class DriverTestCase(test.TestCase):

    def setUp(self):

        def _fake_execute(_command, *_args, **_kwargs):

    def tearDown(self):

    def fake_get_target(obj, iqn):

    def _attach_volume(self):

    def _detach_volume(self, volume_id_list):

class GenericVolumeDriverTestCase(DriverTestCase):

    def test_backup_volume(self):

    def test_restore_backup(self):

class LVMISCSIVolumeDriverTestCase(DriverTestCase):

    def test_delete_busy_volume(self):

    def test_lvm_migrate_volume_no_loc_info(self):

    def test_lvm_migrate_volume_bad_loc_info(self):

    def test_lvm_migrate_volume_diff_driver(self):

    def test_lvm_migrate_volume_diff_host(self):

    def test_lvm_migrate_volume_in_use(self):

    def test_lvm_volume_group_missing(self):

        def get_all_volume_groups():

    def test_lvm_migrate_volume_proceed(self):

        def fake_execute(*args, **kwargs):

        def get_all_volume_groups():

        def _fake_get_all_physical_volumes(obj, root_helper, vg_name):

    def _get_manage_existing_lvs(name):

    def _setup_stubs_for_manage_existing(self):

    def test_lvm_manage_existing(self):

        def _rename_volume(old_name, new_name):

    def test_lvm_manage_existing_bad_size(self):

    def test_lvm_manage_existing_bad_ref(self):

class LVMVolumeDriverTestCase(DriverTestCase):

    def test_delete_volume_invalid_parameter(self):

    def test_delete_volume_bad_path(self):

    def test_delete_volume_thinlvm_snap(self):

class ISCSITestCase(DriverTestCase):

    def setUp(self):

    def _attach_volume(self):

    def test_do_iscsi_discovery(self):

    def test_get_iscsi_properties(self):

    def test_get_volume_stats(self):

        def _fake_get_all_physical_volumes(obj, root_helper, vg_name):

        def _fake_get_all_volume_groups(obj, vg_name=None, no_suffix=True):

    def test_validate_connector(self):

class ISERTestCase(ISCSITestCase):

    def setUp(self):

    def test_get_volume_stats(self):

        def _fake_get_all_physical_volumes(obj, root_helper, vg_name):

        def _fake_get_all_volume_groups(obj, vg_name=None, no_suffix=True):

    def test_get_volume_stats2(self):

class FibreChannelTestCase(DriverTestCase):

    def test_initialize_connection(self):

class VolumePolicyTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def _set_rules(self, rules):

    def test_check_policy(self):

    def test_check_policy_with_target(self):

\OpenStack\cinder-2014.1\cinder\tests\test_volume_configuration.py

class VolumeConfigurationTest(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def test_group_grafts_opts(self):

    def test_opts_no_group(self):

    def test_grafting_multiple_opts(self):

    def test_safe_get(self):

\OpenStack\cinder-2014.1\cinder\tests\test_volume_glance_metadata.py

class VolumeGlanceMetadataTestCase(test.TestCase):

    def setUp(self):

    def test_vol_glance_metadata_bad_vol_id(self):

    def test_vol_update_glance_metadata(self):

    def test_vols_get_glance_metadata(self):

    def _assert_metadata_equals(self, volume_id, key, value, observed):

    def test_vol_delete_glance_metadata(self):

    def test_vol_glance_metadata_copy_to_snapshot(self):

    def test_vol_glance_metadata_copy_from_volume_to_volume(self):

    def test_volume_glance_metadata_copy_to_volume(self):

    def test_volume_snapshot_glance_metadata_get_nonexistent(self):

\OpenStack\cinder-2014.1\cinder\tests\test_volume_rpcapi.py

class VolumeRpcAPITestCase(test.TestCase):

    def setUp(self):

    def test_serialized_volume_has_id(self):

    def _test_volume_api(self, method, rpc_method, **kwargs):

        def _fake_prepare_method(*args, **kwds):

        def _fake_rpc_method(*args, **kwargs):

    def test_create_volume(self):

    def test_create_volume_serialization(self):

    def test_delete_volume(self):

    def test_create_snapshot(self):

    def test_delete_snapshot(self):

    def test_attach_volume_to_instance(self):

    def test_attach_volume_to_host(self):

    def test_detach_volume(self):

    def test_copy_volume_to_image(self):

    def test_initialize_connection(self):

    def test_terminate_connection(self):

    def test_accept_transfer(self):

    def test_extend_volume(self):

    def test_migrate_volume(self):

    def test_migrate_volume_completion(self):

    def test_retype(self):

    def test_manage_existing(self):

\OpenStack\cinder-2014.1\cinder\tests\test_volume_transfer.py

class VolumeTransferTestCase(test.TestCase):

    def setUp(self):

    def test_transfer_volume_create_delete(self):

    def test_transfer_invalid_volume(self):

    def test_transfer_accept(self):

    def test_transfer_get(self):

    def test_delete_transfer_with_deleted_volume(self):

\OpenStack\cinder-2014.1\cinder\tests\test_volume_types.py

class VolumeTypeTestCase(test.TestCase):

    def setUp(self):

    def test_volume_type_create_then_destroy(self):

    def test_get_all_volume_types(self):

    def test_get_default_volume_type(self):

    def test_default_volume_type_missing_in_db(self):

    def test_non_existent_vol_type_shouldnt_delete(self):

    def test_volume_type_with_volumes_shouldnt_delete(self):

    def test_repeated_vol_types_shouldnt_raise(self):

    def test_invalid_volume_types_params(self):

    def test_volume_type_get_by_id_and_name(self):

    def test_volume_type_search_by_extra_spec(self):

    def test_volume_type_search_by_extra_spec_multiple(self):

    def test_is_encrypted(self):

    def test_get_volume_type_qos_specs(self):

    def test_volume_types_diff(self):

\OpenStack\cinder-2014.1\cinder\tests\test_volume_types_extra_specs.py

class VolumeTypeExtraSpecsTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def test_volume_type_specs_get(self):

    def test_volume_type_extra_specs_delete(self):

    def test_volume_type_extra_specs_update(self):

    def test_volume_type_extra_specs_create(self):

    def test_volume_type_get_with_extra_specs(self):

    def test_volume_type_get_by_name_with_extra_specs(self):

    def test_volume_type_get_all(self):

\OpenStack\cinder-2014.1\cinder\tests\test_volume_utils.py

class UsageInfoTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def _create_volume(self, params={}):

class LVMVolumeDriverTestCase(test.TestCase):

    def test_convert_blocksize_option(self):

class ClearVolumeTestCase(test.TestCase):

    def test_clear_volume(self):

    def test_clear_volume_zero(self):

    def test_clear_volume_ionice(self):

    def test_clear_volume_zero_ionice(self):

    def test_clear_volume_shred(self):

    def test_clear_volume_shred_not_clear_size(self):

    def test_clear_volume_invalid_opt(self):

    def test_clear_volume_lvm_snap(self):

        def fake_copy_volume(srcstr, deststr, size, blocksize, **kwargs):

\OpenStack\cinder-2014.1\cinder\tests\test_windows.py

class TestWindowsDriver(test.TestCase):

    def __init__(self, method):

    def setUp(self):

    def tearDown(self):

    def _setup_stubs(self):

        def fake_wutils__init__(self):

    def fake_local_path(self, volume):

    def test_check_for_setup_errors(self):

    def test_create_volume(self):

    def test_delete_volume(self):

    def test_create_snapshot(self):

    def test_create_volume_from_snapshot(self):

    def test_delete_snapshot(self):

    def test_create_export(self):

    def test_initialize_connection(self):

    def test_terminate_connection(self):

    def test_ensure_export(self):

    def test_remove_export(self):

    def test_copy_image_to_volume(self):

    def test_copy_volume_to_image(self):

    def test_create_cloned_volume(self):

    def test_extend_volume(self):

\OpenStack\cinder-2014.1\cinder\tests\test_wsgi.py

class TestLoaderNothingExists(test.TestCase):

    def setUp(self):

    def test_config_not_found(self):

class TestLoaderNormalFilesystem(test.TestCase):

    def setUp(self):

    def test_config_found(self):

    def test_app_not_found(self):

    def test_app_found(self):

class TestWSGIServer(test.TestCase):

    def _ipv6_configured():

    def test_no_app(self):

    def test_start_random_port(self):

    def test_start_random_port_with_ipv6(self):

    def test_app(self):

        def hello_world(env, start_response):

    def test_app_using_ssl(self):

        def hello_world(req):

    def test_app_using_ipv6_and_ssl(self):

        def hello_world(req):

class ExceptionTest(test.TestCase):

    def _wsgi_app(self, inner_app):

    def _do_test_exception_safety_reflected_in_faults(self, expose):

        def fail(req):

    def test_safe_exceptions_are_described_in_faults(self):

    def test_unsafe_exceptions_are_not_described_in_faults(self):

    def _do_test_exception_mapping(self, exception_type, msg):

        def fail(req):

    def test_quota_error_mapping(self):

    def test_non_cinder_notfound_exception_mapping(self):

    def test_non_cinder_exception_mapping(self):

    def test_exception_with_none_code_throws_500(self):

        def fail(req):

    def test_cinder_exception_with_localized_explanation(self, mock_t9n):

        def fail(req):

        def mock_get_non_localized_message(msgid, locale):

        def mock_translate(msgid, locale):

\OpenStack\cinder-2014.1\cinder\tests\test_xenapi_sm.py

class MockContext(object):

    def __init__(ctxt, auth_token):

def simple_context(value):

def get_configured_driver(server='ignore_server', path='ignore_path'):

class DriverTestCase(test.TestCase):

    def assert_flag(self, flagname):

    def test_config_options(self):

    def test_do_setup(self):

    def test_create_volume(self):

    def test_delete_volume(self):

    def test_create_export_does_not_raise_exception(self):

    def test_remove_export_does_not_raise_exception(self):

    def test_initialize_connection(self):

    def test_initialize_connection_null_values(self):

    def _setup_mock_driver(self, server, serverpath, sr_base_path="_srbp"):

    def test_create_snapshot(self):

    def test_create_volume_from_snapshot(self):

    def test_delete_snapshot(self):

    def test_copy_volume_to_image_xenserver_case(self):

    def test_copy_volume_to_image_non_xenserver_case(self):

    def test_use_image_utils_to_upload_volume(self):

    def test_use_glance_plugin_to_upload_volume(self):

    def test_copy_image_to_volume_xenserver_case(self):

    def test_copy_image_to_volume_non_xenserver_case(self):

    def test_use_image_utils_to_pipe_bytes_to_volume(self):

    def test_use_glance_plugin_to_copy_image_to_volume_success(self):

    def test_use_glance_plugin_to_copy_image_to_volume_fail(self):

    def test_get_volume_stats_reports_required_keys(self):

    def test_get_volume_stats_reports_unknown_cap(self):

    def test_reported_driver_type(self):

class ToolsTest(test.TestCase):

    def test_get_this_vm_uuid(self, mock_read_first_line):

    def test_stripped_first_line_of(self):

\OpenStack\cinder-2014.1\cinder\tests\test_zadara.py

class FakeRequest(object):

    def __init__(self, method, url, body):

    def read(self):

    def _compare_url(self, url, template_url):

    def _get_parameters(self, data):

    def _get_counter(self):

    def _login(self):

    def _incorrect_access_key(self, params):

    def _create_volume(self):

    def _create_server(self):

    def _attach(self):

    def _detach(self):

    def _expand(self):

    def _create_snapshot(self):

    def _delete_snapshot(self):

    def _create_clone(self):

    def _delete(self):

    def _generate_list_resp(self, header, footer, body, lst, vol):

    def _list_volumes(self):

    def _list_controllers(self):

    def _list_pools(self):

    def _list_servers(self):

    def _get_server_obj(self, name):

    def _list_vol_attachments(self):

    def _list_vol_snapshots(self):

class FakeHTTPConnection(object):

    def __init__(self, host, port, use_ssl=False):

    def request(self, method, url, body):

    def getresponse(self):

    def close(self):

class FakeHTTPSConnection(FakeHTTPConnection):

    def __init__(self, host, port):

class ZadaraVPSADriverTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def test_create_destroy(self):

    def test_create_destroy_multiple(self):

    def test_destroy_non_existent(self):

    def test_empty_apis(self):

    def test_volume_attach_detach(self):

    def test_volume_attach_multiple_detach(self):

    def test_wrong_attach_params(self):

    def test_wrong_detach_params(self):

    def test_wrong_login_reply(self):

    def test_ssl_use(self):

    def test_bad_http_response(self):

    def test_delete_without_detach(self):

    def test_no_active_ctrl(self):

    def test_create_destroy_snapshot(self):

    def test_expand_volume(self):

    def test_create_destroy_clones(self):

    def test_get_volume_stats(self):

\OpenStack\cinder-2014.1\cinder\tests\utils.py

def get_test_admin_context():

def create_volume(ctxt, host='test_host', display_name='test_volume', display_description='this is a test volume', status='available', migration_status=None, size=1, availability_zone='fake_az', volume_type_id=None, **kwargs):

def create_snapshot(ctxt, volume_id, display_name='test_snapshot', display_description='this is a test snapshot', status='creating'):

\OpenStack\cinder-2014.1\cinder\tests\volume\drivers\netapp\test_iscsi.py

class NetAppDirectISCSIDriverTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def test_create_lun(self):

    def test_create_lun_with_qos_policy_group(self):

class NetAppiSCSICModeTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def test_clone_lun_multiple_zapi_calls(self):

    def test_clone_lun_zero_block_count(self):

class NetAppiSCSI7ModeTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def test_clone_lun_multiple_zapi_calls(self):

    def test_clone_lun_zero_block_count(self):

\OpenStack\cinder-2014.1\cinder\tests\volume\drivers\netapp\__init__.py

\OpenStack\cinder-2014.1\cinder\tests\volume\drivers\__init__.py

\OpenStack\cinder-2014.1\cinder\tests\volume\__init__.py

\OpenStack\cinder-2014.1\cinder\tests\windows\db_fakes.py

def get_fake_volume_info():

def get_fake_volume_info_cloned():

def get_fake_image_meta():

def get_fake_snapshot_info():

def get_fake_connector_info():

\OpenStack\cinder-2014.1\cinder\tests\windows\__init__.py

\OpenStack\cinder-2014.1\cinder\tests\xenapi\__init__.py

\OpenStack\cinder-2014.1\cinder\tests\zonemanager\test_brcd_fc_san_lookup_service.py

class TestBrcdFCSanLookupService(brcd_lookup.BrcdFCSanLookupService, test.TestCase):

    def setUp(self):

    def __init__(self, *args, **kwargs):

    def create_configuration(self):

    def test_get_device_mapping_from_network(self, get_nameserver_info_mock):

    def test_get_nameserver_info(self, get_switch_data_mock):

    def test__get_switch_data(self):

    def test__parse_ns_output(self):

    def test_get_formatted_wwn(self):

class Channel(object):

    def recv_exit_status(self):

class Stream(object):

    def __init__(self, buffer=''):

    def readlines(self):

    def close(self):

    def flush(self):

\OpenStack\cinder-2014.1\cinder\tests\zonemanager\test_brcd_fc_zone_client_cli.py

class TestBrcdFCZoneClientCLI(BrcdFCZoneClientCLI, test.TestCase):

    def setUp(self):

    def __init__(self, *args, **kwargs):

    def test_get_active_zone_set(self, get_switch_info_mock):

    def test_get_active_zone_set_ssh_error(self, run_ssh_mock):

    def test_add_zones_new_zone_no_activate(self, get_active_zs_mock, apply_zone_change_mock, cfg_save_mock):

    def test_add_zones_new_zone_activate(self, get_active_zs_mock, apply_zone_change_mock, cfg_save_mock, activate_zoneset_mock):

    def test_activate_zoneset(self, ssh_execute_mock):

    def test_deactivate_zoneset(self, ssh_execute_mock):

    def test_delete_zones_activate_false(self, get_active_zs_mock, apply_zone_change_mock, cfg_save_mock):

    def test_delete_zones_activate_true(self, get_active_zs_mock, apply_zone_change_mock, cfg_save_mock, activate_zs_mock):

    def test_get_nameserver_info(self, get_switch_info_mock):

    def test_get_nameserver_info_ssh_error(self, run_ssh_mock):

    def test__cfg_save(self, ssh_execute_mock):

    def test__zone_delete(self, apply_zone_change_mock):

    def test__cfg_trans_abort(self, apply_zone_change_mock):

    def test__is_trans_abortable_true(self, run_ssh_mock):

    def test__is_trans_abortable_ssh_error(self, run_ssh_mock):

    def test__is_trans_abortable_false(self, run_ssh_mock):

    def test_apply_zone_change(self, run_ssh_mock):

    def test__get_switch_info(self, run_ssh_mock):

    def test__parse_ns_output(self):

    def test_is_supported_firmware(self, exec_shell_cmd_mock):

    def test_is_supported_firmware_invalid(self, exec_shell_cmd_mock):

    def test_is_supported_firmware_no_ssh_response(self, exec_shell_cmd_mock):

    def test_is_supported_firmware_ssh_error(self, exec_shell_cmd_mock):

class Channel(object):

    def recv_exit_status(self):

class Stream(object):

    def __init__(self, buffer=''):

    def readlines(self):

    def splitlines(self):

    def close(self):

    def flush(self):

\OpenStack\cinder-2014.1\cinder\tests\zonemanager\test_brcd_fc_zone_driver.py

class BrcdFcZoneDriverBaseTest(object):

    def setup_config(self, is_normal, mode):

class TestBrcdFcZoneDriver(BrcdFcZoneDriverBaseTest, test.TestCase):

    def setUp(self):

    def setup_driver(self, config):

    def fake_get_active_zone_set(self, fabric_ip, fabric_user, fabric_pwd):

    def fake_get_san_context(self, target_wwn_list):

    def test_add_connection(self):

    def test_delete_connection(self):

    def test_add_connection_for_initiator_mode(self):

    def test_delete_connection_for_initiator_mode(self):

    def test_add_connection_for_invalid_fabric(self):

    def test_delete_connection_for_invalid_fabric(self):

class FakeBrcdFCZoneClientCLI(object):

    def __init__(self, ipaddress, username, password, port):

    def get_active_zone_set(self):

    def add_zones(self, zones, isActivate):

    def delete_zones(self, zone_names, isActivate):

    def is_supported_firmware(self):

    def get_nameserver_info(self):

    def close_connection(self):

    def cleanup(self):

class FakeBrcdFCSanLookupService(object):

    def get_device_mapping_from_network(self, initiator_wwn_list, target_wwn_list):

class GlobalVars(object):

\OpenStack\cinder-2014.1\cinder\tests\zonemanager\test_brcd_lookup_service.py

class TestFCSanLookupService(FCSanLookupService, test.TestCase):

    def setUp(self):

    def __init__(self, *args, **kwargs):

    def setup_config(self):

    def test_get_device_mapping_from_network(self):

    def test_get_device_mapping_from_network_for_invalid_config(self):

class FakeBrcdFCSanLookupService(object):

    def __init__(self, **kwargs):

    def get_device_mapping_from_network(self, initiator_wwn_list, target_wwn_list):

class GlobalParams(object):

\OpenStack\cinder-2014.1\cinder\tests\zonemanager\test_fc_zone_manager.py

class TestFCZoneManager(ZoneManager, test.TestCase):

    def setUp(self):

    def __init__(self, *args, **kwargs):

    def test_add_connection(self):

    def test_add_connection_error(self):

    def test_delete_connection(self):

    def test_delete_connection_error(self):

\OpenStack\cinder-2014.1\cinder\tests\zonemanager\test_volume_manager_fc.py

class TestVolumeManager(manager.VolumeManager, test.TestCase):

    def setUp(self):

    def tearDown(self):

    def __init__(self, *args, **kwargs):

    def test_initialize_connection_voltype_fc_mode_fabric(self, utils_mock):

    def test_initialize_connection_voltype_fc_mode_none(self, utils_mock):

    def test_terminate_connection_exception(self):

    def test_terminate_connection_voltype_fc_mode_fabric(self, utils_mock):

    def test_terminate_connection_mode_none(self, utils_mock):

    def test_terminate_connection_conn_info_none(self, utils_mock):

    def test__add_or_delete_connection_add(self, add_connection_mock):

    def test__add_or_delete_connection_delete(self, delete_connection_mock):

    def test__add_or_delete_connection_no_init_target_map(self, del_conn_mock):

\OpenStack\cinder-2014.1\cinder\tests\zonemanager\__init__.py

\OpenStack\cinder-2014.1\cinder\tests\__init__.py

\OpenStack\cinder-2014.1\cinder\transfer\api.py

class API(base.Base):

    def __init__(self, db_driver=None):

    def get(self, context, transfer_id):

    def delete(self, context, transfer_id):

    def get_all(self, context, filters={}):

    def _get_random_string(self, length):

    def _get_crypt_hash(self, salt, auth_key):

    def create(self, context, volume_id, display_name):

    def accept(self, context, transfer_id, auth_key):

\OpenStack\cinder-2014.1\cinder\transfer\__init__.py

\OpenStack\cinder-2014.1\cinder\units.py

\OpenStack\cinder-2014.1\cinder\utils.py

def find_config(config_path):

def as_int(obj, quiet=True):

def check_exclusive_options(**kwargs):

def execute(*cmd, **kwargs):

def check_ssh_injection(cmd_list):

def create_channel(client, width, height):

class SSHPool(pools.Pool):

    def __init__(self, ip, port, conn_timeout, login, password=None, privatekey=None, *args, **kwargs):

    def create(self):

    def get(self):

    def remove(self, ssh):

def cinderdir():

def last_completed_audit_period(unit=None):

def generate_password(length=20, symbolgroups=DEFAULT_PASSWORD_SYMBOLS):

def generate_username(length=20, symbolgroups=DEFAULT_PASSWORD_SYMBOLS):

class LazyPluggable(object):

    def __init__(self, pivot, **backends):

    def __get_backend(self):

    def __getattr__(self, key):

class ProtectedExpatParser(expatreader.ExpatParser):

    def __init__(self, forbid_dtd=True, forbid_entities=True, *args, **kwargs):

    def start_doctype_decl(self, name, sysid, pubid, has_internal_subset):

    def entity_decl(self, entityName, is_parameter_entity, value, base, systemId, publicId, notationName):

    def unparsed_entity_decl(self, name, base, sysid, pubid, notation_name):

    def reset(self):

def safe_minidom_parse_string(xml_string):

def xhtml_escape(value):

def get_from_path(items, path):

def is_valid_boolstr(val):

def monkey_patch():

def generate_glance_url():

def make_dev_path(dev, partition=None, base='/dev'):

def total_seconds(td):

def sanitize_hostname(hostname):

def read_cached_file(filename, cache_info, reload_func=None):

def hash_file(file_like_object):

def service_is_up(service):

def read_file_as_root(file_path):

def temporary_chown(path, owner_uid=None):

def tempdir(**kwargs):

def walk_class_hierarchy(clazz, encountered=None):

def get_root_helper():

def brick_get_connector_properties():

def brick_get_connector(protocol, driver=None, execute=processutils.execute, use_multipath=False, device_scan_attempts=3, *args, **kwargs):

def require_driver_initialized(driver):

def get_file_mode(path):

def get_file_gid(path):

def check_string_length(value, name, min_length=0, max_length=None):

def add_visible_admin_metadata(context, volume, volume_api):

\OpenStack\cinder-2014.1\cinder\version.py

\OpenStack\cinder-2014.1\cinder\volume\api.py

def wrap_check_policy(func):

 def wrapped(self, context, target_obj, *args, **kwargs):

def check_policy(context, action, target_obj=None):

class API(base.Base):

    def __init__(self, db_driver=None, image_service=None):

    def _valid_availability_zone(self, availability_zone):

    def list_availability_zones(self):

    def create(self, context, size, name, description, snapshot=None, image_id=None, volume_type=None, metadata=None, availability_zone=None, source_volume=None, scheduler_hints=None, backup_source_volume=None):

        def check_volume_az_zone(availability_zone):

    def delete(self, context, volume, force=False, unmanage_only=False):

    def update(self, context, volume, fields):

    def get(self, context, volume_id):

    def get_all(self, context, marker=None, limit=None, sort_key='created_at', sort_dir='desc', filters=None):

    def get_snapshot(self, context, snapshot_id):

    def get_volume(self, context, volume_id):

    def get_all_snapshots(self, context, search_opts=None):

    def check_attach(self, volume):

    def check_detach(self, volume):

    def reserve_volume(self, context, volume):

    def unreserve_volume(self, context, volume):

    def begin_detaching(self, context, volume):

    def roll_detaching(self, context, volume):

    def attach(self, context, volume, instance_uuid, host_name, mountpoint, mode):

    def detach(self, context, volume):

    def initialize_connection(self, context, volume, connector):

    def terminate_connection(self, context, volume, connector, force=False):

    def accept_transfer(self, context, volume, new_user, new_project):

    def _create_snapshot(self, context, volume, name, description, force=False, metadata=None):

    def create_snapshot(self, context, volume, name, description, metadata=None):

    def create_snapshot_force(self, context, volume, name, description, metadata=None):

    def delete_snapshot(self, context, snapshot, force=False):

    def update_snapshot(self, context, snapshot, fields):

    def get_volume_metadata(self, context, volume):

    def delete_volume_metadata(self, context, volume, key):

    def _check_metadata_properties(self, metadata=None):

    def update_volume_metadata(self, context, volume, metadata, delete=False):

    def get_volume_metadata_value(self, volume, key):

    def get_volume_admin_metadata(self, context, volume):

    def delete_volume_admin_metadata(self, context, volume, key):

    def update_volume_admin_metadata(self, context, volume, metadata, delete=False):

    def get_snapshot_metadata(self, context, snapshot):

    def delete_snapshot_metadata(self, context, snapshot, key):

    def update_snapshot_metadata(self, context, snapshot, metadata, delete=False):

    def get_snapshot_metadata_value(self, snapshot, key):

    def get_volumes_image_metadata(self, context):

    def get_volume_image_metadata(self, context, volume):

    def _check_volume_availability(self, volume, force):

    def copy_volume_to_image(self, context, volume, metadata, force):

    def extend(self, context, volume, new_size):

    def migrate_volume(self, context, volume, host, force_host_copy):

    def migrate_volume_completion(self, context, volume, new_volume, error):

    def update_readonly_flag(self, context, volume, flag):

    def retype(self, context, volume, new_type, migration_policy=None):

    def manage_existing(self, context, host, ref, name=None, description=None, volume_type=None, metadata=None, availability_zone=None):

class HostAPI(base.Base):

    def __init__(self):

    def set_host_enabled(self, context, host, enabled):

    def get_host_uptime(self, context, host):

    def host_power_action(self, context, host, action):

    def set_host_maintenance(self, context, host, mode):

\OpenStack\cinder-2014.1\cinder\volume\configuration.py

class Configuration(object):

    def __init__(self, volume_opts, config_group=None):

    def _ensure_config_values(self, volume_opts):

    def append_config_values(self, volume_opts):

    def safe_get(self, value):

    def __getattr__(self, value):

\OpenStack\cinder-2014.1\cinder\volume\driver.py

class VolumeDriver(object):

    def __init__(self, execute=utils.execute, *args, **kwargs):

    def set_execute(self, execute):

    def set_initialized(self):

    def initialized(self):

    def get_version(self):

    def _is_non_recoverable(self, err, non_recoverable_list):

    def _try_execute(self, *command, **kwargs):

    def check_for_setup_error(self):

    def create_volume(self, volume):

    def create_volume_from_snapshot(self, volume, snapshot):

    def create_cloned_volume(self, volume, src_vref):

    def delete_volume(self, volume):

    def create_snapshot(self, snapshot):

    def delete_snapshot(self, snapshot):

    def local_path(self, volume):

    def ensure_export(self, context, volume):

    def create_export(self, context, volume):

    def remove_export(self, context, volume):

    def initialize_connection(self, volume, connector):

    def terminate_connection(self, volume, connector, **kwargs):

    def attach_volume(self, context, volume, instance_uuid, host_name, mountpoint):

    def detach_volume(self, context, volume):

    def get_volume_stats(self, refresh=False):

    def do_setup(self, context):

    def validate_connector(self, connector):

    def copy_volume_data(self, context, src_vol, dest_vol, remote=None):

    def copy_image_to_volume(self, context, volume, image_service, image_id):

    def copy_volume_to_image(self, context, volume, image_service, image_meta):

    def _attach_volume(self, context, volume, properties, remote=False):

    def _detach_volume(self, context, attach_info, volume, properties, force=False, remote=False):

    def clone_image(self, volume, image_location, image_id, image_meta):

    def backup_volume(self, context, backup, backup_service):

    def restore_backup(self, context, backup, volume, backup_service):

    def clear_download(self, context, volume):

    def extend_volume(self, volume, new_size):

    def migrate_volume(self, context, volume, host):

    def retype(self, context, volume, new_type, diff, host):

    def accept_transfer(self, context, volume, new_user, new_project):

    def manage_existing(self, volume, existing_ref):

    def manage_existing_get_size(self, volume, existing_ref):

    def unmanage(self, volume):

class ISCSIDriver(VolumeDriver):

    def __init__(self, *args, **kwargs):

    def _do_iscsi_discovery(self, volume):

    def _get_iscsi_properties(self, volume):

    def _run_iscsiadm(self, iscsi_properties, iscsi_command, **kwargs):

    def _run_iscsiadm_bare(self, iscsi_command, **kwargs):

    def _iscsiadm_update(self, iscsi_properties, property_key, property_value, **kwargs):

    def initialize_connection(self, volume, connector):

    def validate_connector(self, connector):

    def terminate_connection(self, volume, connector, **kwargs):

    def get_volume_stats(self, refresh=False):

    def _update_volume_stats(self):

    def get_target_helper(self, db):

class FakeISCSIDriver(ISCSIDriver):

    def __init__(self, *args, **kwargs):

    def create_volume(self, volume):

    def check_for_setup_error(self):

    def initialize_connection(self, volume, connector):

    def terminate_connection(self, volume, connector, **kwargs):

    def fake_execute(cmd, *_args, **_kwargs):

class ISERDriver(ISCSIDriver):

    def __init__(self, *args, **kwargs):

    def initialize_connection(self, volume, connector):

    def _update_volume_stats(self):

    def get_target_helper(self, db):

class FakeISERDriver(FakeISCSIDriver):

    def __init__(self, *args, **kwargs):

    def initialize_connection(self, volume, connector):

    def fake_execute(cmd, *_args, **_kwargs):

class FibreChannelDriver(VolumeDriver):

    def __init__(self, *args, **kwargs):

    def initialize_connection(self, volume, connector):

\OpenStack\cinder-2014.1\cinder\volume\drivers\block_device.py

class BlockDeviceDriver(driver.ISCSIDriver):

    def __init__(self, *args, **kwargs):

    def set_execute(self, execute):

    def check_for_setup_error(self):

    def create_volume(self, volume):

    def initialize_connection(self, volume, connector):

    def terminate_connection(self, volume, connector, **kwargs):

    def create_export(self, context, volume):

    def remove_export(self, context, volume):

    def ensure_export(self, context, volume):

    def delete_volume(self, volume):

    def local_path(self, volume):

    def copy_image_to_volume(self, context, volume, image_service, image_id):

    def copy_volume_to_image(self, context, volume, image_service, image_meta):

    def create_cloned_volume(self, volume, src_vref):

    def get_volume_stats(self, refresh=False):

    def _update_volume_stats(self):

    def _get_used_devices(self):

    def _get_device_size(self, dev_path):

    def _devices_sizes(self):

    def find_appropriate_size_device(self, size):

\OpenStack\cinder-2014.1\cinder\volume\drivers\coraid.py

class CoraidRESTClient(object):

    def __init__(self, esm_url):

    def _check_esm_url(self, esm_url):

    def rpc(self, handle, url_params, data, allow_empty_response=False):

    def _rpc(self, handle, url_params, data, allow_empty_response):

def to_coraid_kb(gb):

def coraid_volume_size(gb):

class CoraidAppliance(object):

    def __init__(self, rest_client, username, password, group):

    def _login(self):

    def _set_effective_group(self, groups_map, group):

    def _ensure_session(self):

    def _relogin(self):

    def rpc(self, handle, url_params, data, allow_empty_response=False):

    def _is_session_expired(self, reply):

    def _is_bad_config_state(self, reply):

    def configure(self, json_request):

    def esm_command(self, request):

    def get_volume_info(self, volume_name):

    def get_volume_repository(self, volume_name):

    def get_all_repos(self):

    def ping(self):

    def create_lun(self, repository_name, volume_name, volume_size_in_gb):

    def delete_lun(self, volume_name):

    def resize_volume(self, volume_name, new_volume_size_in_gb):

    def create_snapshot(self, volume_name, snapshot_name):

    def delete_snapshot(self, snapshot_name):

    def create_volume_from_snapshot(self, snapshot_name, volume_name, dest_repository_name):

    def clone_volume(self, src_volume_name, dst_volume_name, dst_repository_name):

class CoraidDriver(driver.VolumeDriver):

    def __init__(self, *args, **kwargs):

    def appliance(self):

    def check_for_setup_error(self):

    def _get_repository(self, volume_type):

    def create_volume(self, volume):

    def create_cloned_volume(self, volume, src_vref):

    def delete_volume(self, volume):

    def create_snapshot(self, snapshot):

    def delete_snapshot(self, snapshot):

    def create_volume_from_snapshot(self, volume, snapshot):

    def extend_volume(self, volume, new_size):

    def initialize_connection(self, volume, connector):

    def _get_repository_capabilities(self):

    def update_volume_stats(self):

    def get_volume_stats(self, refresh=False):

    def local_path(self, volume):

    def create_export(self, context, volume):

    def remove_export(self, context, volume):

    def terminate_connection(self, volume, connector, **kwargs):

    def ensure_export(self, context, volume):

\OpenStack\cinder-2014.1\cinder\volume\drivers\emc\emc_cli_iscsi.py

class EMCCLIISCSIDriver(driver.ISCSIDriver):

    def __init__(self, *args, **kwargs):

    def check_for_setup_error(self):

    def create_volume(self, volume):

    def create_volume_from_snapshot(self, volume, snapshot):

    def create_cloned_volume(self, volume, src_vref):

    def delete_volume(self, volume):

    def create_snapshot(self, snapshot):

    def delete_snapshot(self, snapshot):

    def ensure_export(self, context, volume):

    def create_export(self, context, volume):

    def remove_export(self, context, volume):

    def check_for_export(self, context, volume_id):

    def extend_volume(self, volume, new_size):

    def initialize_connection(self, volume, connector):

        def do_initialize_connection():

    def _do_iscsi_discovery(self, volume):

    def vnx_get_iscsi_properties(self, volume, connector):

    def terminate_connection(self, volume, connector, **kwargs):

        def do_terminate_connection():

    def get_volume_stats(self, refresh=False):

    def update_volume_stats(self):

\OpenStack\cinder-2014.1\cinder\volume\drivers\emc\emc_smis_common.py

class EMCSMISCommon():

    def __init__(self, prtcl, configuration=None):

    def create_volume(self, volume):

    def create_volume_from_snapshot(self, volume, snapshot):

    def create_cloned_volume(self, volume, src_vref):

    def delete_volume(self, volume):

    def create_snapshot(self, snapshot, volume):

    def delete_snapshot(self, snapshot, volume):

    def _expose_paths(self, configservice, vol_instance, connector):

    def _hide_paths(self, configservice, vol_instance, connector):

    def _add_members(self, configservice, vol_instance):

    def _remove_members(self, configservice, vol_instance):

    def _map_lun(self, volume, connector):

    def _unmap_lun(self, volume, connector):

    def initialize_connection(self, volume, connector):

    def terminate_connection(self, volume, connector):

    def extend_volume(self, volume, new_size):

    def update_volume_stats(self):

    def _get_storage_type(self, volume, filename=None):

    def _get_storage_type_conffile(self, filename=None):

    def _get_masking_view(self, filename=None):

    def _get_timeout(self, filename=None):

    def _get_ecom_cred(self, filename=None):

    def _get_ecom_server(self, filename=None):

    def _get_ecom_connection(self, filename=None):

    def _find_replication_service(self, storage_system):

    def _find_storage_configuration_service(self, storage_system):

    def _find_controller_configuration_service(self, storage_system):

    def _find_storage_hardwareid_service(self, storage_system):

    def _find_pool(self, storage_type, details=False):

    def _parse_pool_instance_id(self, instanceid):

    def _find_lun(self, volume):

    def _find_storage_sync_sv_sv(self, snapshot, volume, waitforsync=True):

    def _find_initiator_names(self, connector):

    def _wait_for_job_complete(self, job):

    def _find_lunmasking_scsi_protocol_controller(self, storage_system, connector):

    def _find_lunmasking_scsi_protocol_controller_for_vol(self, vol_instance, connector):

    def get_num_volumes_mapped(self, volume, connector):

    def _find_avail_device_number(self, storage_system):

    def find_device_number(self, volume, connector):

    def _find_device_masking_group(self):

    def _find_storage_processor_system(self, owningsp, storage_system):

    def _find_iscsi_protocol_endpoints(self, owningsp, storage_system):

    def _getnum(self, num, datatype):

    def _getinstancename(self, classname, bindings):

    def get_target_wwns(self, storage_system, connector):

    def _find_storage_hardwareids(self, connector):

    def _get_volumetype_extraspecs(self, volume):

    def _get_provisioning(self, storage_type):

\OpenStack\cinder-2014.1\cinder\volume\drivers\emc\emc_smis_fc.py

class EMCSMISFCDriver(driver.FibreChannelDriver):

    def __init__(self, *args, **kwargs):

    def check_for_setup_error(self):

    def create_volume(self, volume):

    def create_volume_from_snapshot(self, volume, snapshot):

    def create_cloned_volume(self, volume, src_vref):

    def delete_volume(self, volume):

OpenStack Index

Previous

Next