¡@

Home 

OpenStack Study: cinder

OpenStack Index

Previous

Next

    def test_check_system_pwd_not_sync(self):

        def list_system():

    def test_connect(self):

    def test_create_destroy(self):

    def test_create_vol_snapshot_destroy(self):

    def test_map_unmap(self):

    def test_cloned_volume_destroy(self):

    def test_map_by_creating_host(self):

    def test_vol_stats(self):

    def test_create_vol_snapshot_diff_size_resize(self):

    def test_create_vol_snapshot_diff_size_subclone(self):

\OpenStack\cinder-2014.1\cinder\tests\test_netapp_nfs.py

def create_configuration():

class FakeVolume(object):

    def __init__(self, size=0):

    def __getitem__(self, key):

    def __setitem__(self, key, val):

class FakeSnapshot(object):

    def __init__(self, volume_size=0):

    def __getitem__(self, key):

class FakeResponse(object):

    def __init__(self, status):

class NetappDirectCmodeNfsDriverTestCase(test.TestCase):

    def setUp(self):

    def test_create_snapshot(self):

    def test_create_volume_from_snapshot(self):

    def _prepare_delete_snapshot_mock(self, snapshot_exists):

    def test_delete_existing_snapshot(self):

    def test_delete_missing_snapshot(self):

    def _custom_setup(self):

    def test_check_for_setup_error(self):

    def test_do_setup(self):

    def _prepare_clone_mock(self, status):

    def _prepare_info_by_ip_response(self):

    def test_clone_volume(self):

    def test_register_img_in_cache_noshare(self):

    def test_register_img_in_cache_with_share(self):

    def test_find_image_in_cache_no_shares(self):

    def test_find_image_in_cache_shares(self):

    def test_find_old_cache_files_notexists(self):

    def test_find_old_cache_files_exists(self):

    def test_delete_files_till_bytes_free_success(self):

    def test_clean_image_cache_exec(self):

    def test_clean_image_cache_noexec(self):

    def test_clone_image_fromcache(self):

    def get_img_info(self, format):

    def test_clone_image_cloneableshare_nospace(self):

    def test_clone_image_cloneableshare_raw(self):

    def test_clone_image_cloneableshare_notraw(self):

    def test_clone_image_file_not_discovered(self):

    def test_clone_image_resizefails(self):

    def test_is_cloneable_share_badformats(self):

    def test_is_cloneable_share_goodformat1(self):

    def test_is_cloneable_share_goodformat2(self):

    def test_is_cloneable_share_goodformat3(self):

    def test_is_cloneable_share_goodformat4(self):

    def test_is_cloneable_share_goodformat5(self):

    def test_check_share_in_use_no_conn(self):

    def test_check_share_in_use_invalid_conn(self):

    def test_check_share_in_use_incorrect_host(self):

    def test_check_share_in_use_success(self):

    def test_construct_image_url_loc(self):

    def test_construct_image_url_direct(self):

class NetappDirectCmodeNfsDriverOnlyTestCase(test.TestCase):

    def setUp(self):

    def _custom_setup(self):

    def test_create_volume(self, mock_volume_extra_specs):

    def test_create_volume_with_qos_policy(self, mock_volume_extra_specs):

    def test_copy_img_to_vol_copyoffload_success(self):

    def test_copy_img_to_vol_copyoffload_failure(self):

    def test_copyoffload_frm_cache_success(self):

    def test_copyoffload_frm_img_service_success(self):

    def test_cache_copyoffload_workflow_success(self):

    def test_img_service_raw_copyoffload_workflow_success(self, mock_qemu_img_info):

    def test_img_service_qcow2_copyoffload_workflow_success(self, mock_exists, mock_qemu_img_info, mock_cvrt_image):

class NetappDirect7modeNfsDriverTestCase(NetappDirectCmodeNfsDriverTestCase):

    def _custom_setup(self):

    def _prepare_delete_snapshot_mock(self, snapshot_exists):

    def test_check_for_setup_error_version(self):

    def test_check_for_setup_error(self):

    def test_do_setup(self):

    def _prepare_clone_mock(self, status):

    def test_clone_volume_clear(self):

\OpenStack\cinder-2014.1\cinder\tests\test_netapp_ssc.py

class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):

    def log_message(self, format, *args):

class FakeHttplibSocket(object):

    def __init__(self, value):

        def newclose():

    def makefile(self, mode, _other):

class FakeDirectCMODEServerHandler(FakeHTTPRequestHandler):

    def do_GET(s):

    def do_POST(s):

class FakeDirectCmodeHTTPConnection(object):

    def __init__(self, host, timeout=None):

    def request(self, method, path, data=None, headers=None):

    def set_debuglevel(self, level):

    def getresponse(self):

    def getresponsebody(self):

def createNetAppVolume(**kwargs):

class SscUtilsTestCase(test.TestCase):

    def setUp(self):

    def test_cl_vols_ssc_all(self):

    def test_cl_vols_ssc_single(self):

    def test_get_cluster_ssc(self):

    def test_vols_for_boolean_specs(self):

    def test_vols_for_optional_specs(self):

    def test_query_cl_vols_for_ssc(self):

    def test_query_aggr_options(self):

    def test_query_aggr_storage_disk(self):

\OpenStack\cinder-2014.1\cinder\tests\test_nexenta.py

class TestNexentaISCSIDriver(test.TestCase):

    def __init__(self, method):

    def setUp(self):

    def test_setup_error(self):

    def test_setup_error_fail(self):

    def test_local_path(self):

    def test_create_volume(self):

    def test_delete_volume(self):

    def test_create_cloned_volume(self):

    def test_migrate_volume(self):

    def test_create_snapshot(self):

    def test_create_volume_from_snapshot(self):

    def test_delete_snapshot(self):

    def _stub_export_method(self, module, method, args, error, raise_exception, fail=False):

    def _stub_all_export_methods(self, fail=False):

    def test_create_export(self):

    def __get_test(i):

        def _test_create_export_fail(self):

    def test_ensure_export(self):

    def test_remove_export(self):

    def test_remove_export_fail_0(self):

    def test_remove_export_fail_1(self):

    def test_get_volume_stats(self):

class TestNexentaJSONRPC(test.TestCase):

    def setUp(self):

    def test_call(self):

    def test_call_deep(self):

    def test_call_auto(self):

    def test_call_error(self):

    def test_call_fail(self):

class TestNexentaNfsDriver(test.TestCase):

    def _create_volume_db_entry(self):

    def setUp(self):

    def test_check_for_setup_error(self):

    def test_initialize_connection(self):

    def test_do_create_volume(self):

    def test_create_sparsed_file(self):

    def test_create_regular_file(self):

    def test_set_rw_permissions_for_all(self):

    def test_local_path(self):

    def test_remote_path(self):

    def test_share_folder(self):

    def test_load_shares_config(self):

    def test_get_capacity_info(self):

    def test_get_share_datasets(self):

    def test_delete_snapshot(self):

    def test_delete_volume(self):

class TestNexentaUtils(test.TestCase):

    def test_str2size(self):

    def test_str2gib_size(self):

    def test_parse_nms_url(self):

\OpenStack\cinder-2014.1\cinder\tests\test_nfs.py

class DumbVolume(object):

    def __setitem__(self, key, value):

    def __getitem__(self, item):

class RemoteFsDriverTestCase(test.TestCase):

    def setUp(self):

    def test_create_sparsed_file(self):

    def test_create_regular_file(self):

    def test_create_qcow2_file(self):

    def test_set_rw_permissions_for_all(self):

class NfsDriverTestCase(test.TestCase):

    def setUp(self):

    def stub_out_not_replaying(self, obj, attr_name):

    def test_local_path(self):

    def test_copy_image_to_volume(self):

        def fake_local_path(volume):

    def test_get_mount_point_for_share(self):

    def test_get_capacity_info(self):

    def test_get_capacity_info_for_share_and_mount_point_with_spaces(self):

    def test_load_shares_config(self):

    def test_ensure_shares_mounted_should_save_mounting_successfully(self):

    def test_ensure_shares_mounted_should_not_save_mounting_with_error(self):

    def test_setup_should_throw_error_if_shares_config_not_configured(self):

    def test_setup_should_throw_error_if_oversub_ratio_less_than_zero(self):

    def test_setup_should_throw_error_if_used_ratio_less_than_zero(self):

    def test_setup_should_throw_error_if_used_ratio_greater_than_one(self):

    def test_setup_should_throw_exception_if_nfs_client_is_not_installed(self):

    def test_find_share_should_throw_error_if_there_is_no_mounted_shares(self):

    def test_find_share(self):

    def test_find_share_should_throw_error_if_there_is_no_enough_place(self):

    def _simple_volume(self):

    def test_create_sparsed_volume(self):

    def test_create_nonsparsed_volume(self):

    def test_create_volume_should_ensure_nfs_mounted(self):

    def test_create_volume_should_return_provider_location(self):

    def test_delete_volume(self):

    def test_delete_should_ensure_share_mounted(self):

    def test_delete_should_not_delete_if_provider_location_not_provided(self):

    def test_get_volume_stats(self):

\OpenStack\cinder-2014.1\cinder\tests\test_policy.py

class PolicyFileTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def test_modified_policy_reloads(self):

class PolicyTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def test_enforce_nonexistent_action_throws(self):

    def test_enforce_bad_action_throws(self):

    def test_enforce_good_action(self):

    def test_enforce_http_true(self):

        def fakeurlopen(url, post_data):

    def test_enforce_http_false(self):

        def fakeurlopen(url, post_data):

    def test_templatized_enforcement(self):

    def test_early_AND_enforcement(self):

    def test_early_OR_enforcement(self):

    def test_ignore_case_role_check(self):

class DefaultPolicyTestCase(test.TestCase):

    def setUp(self):

    def _set_brain(self, default_rule):

    def tearDown(self):

    def test_policy_called(self):

    def test_not_found_policy_calls_default(self):

    def test_default_not_found(self):

class ContextIsAdminPolicyTestCase(test.TestCase):

    def setUp(self):

    def test_default_admin_role_is_admin(self):

    def test_custom_admin_role_is_admin(self):

    def test_context_is_admin_undefined(self):

\OpenStack\cinder-2014.1\cinder\tests\test_qos_specs.py

def fake_db_qos_specs_create(context, values):

class QoSSpecsTestCase(test.TestCase):

    def setUp(self):

    def _create_qos_specs(self, name, values=None):

    def test_create(self):

    def test_update(self):

        def fake_db_update(context, specs_id, values):

    def test_delete(self):

        def fake_db_associations_get(context, id):

        def fake_db_delete(context, id):

        def fake_disassociate_all(context, id):

    def test_delete_keys(self):

        def fake_db_qos_delete_key(context, id, key):

        def fake_qos_specs_get(context, id):

    def test_get_associations(self):

        def fake_db_associate_get(context, id):

    def test_associate_qos_with_type(self):

        def fake_qos_specs_get(context, id):

        def fake_db_associate(context, id, type_id):

        def fake_vol_type_qos_get(type_id):

    def test_disassociate_qos_specs(self):

        def fake_qos_specs_get(context, id):

        def fake_db_disassociate(context, id, type_id):

    def test_disassociate_all(self):

        def fake_db_disassociate_all(context, id):

        def fake_qos_specs_get(context, id):

    def test_get_all_specs(self):

    def test_get_qos_specs(self):

    def test_get_qos_specs_by_name(self):

\OpenStack\cinder-2014.1\cinder\tests\test_quota.py

class QuotaIntegrationTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def _create_volume(self, size=1):

    def _create_snapshot(self, volume):

    def test_too_many_volumes(self):

    def test_too_many_volumes_of_type(self):

    def test_too_many_snapshots_of_type(self):

    def test_too_many_gigabytes(self):

    def test_too_many_combined_gigabytes(self):

    def test_no_snapshot_gb_quota_flag(self):

    def test_too_many_gigabytes_of_type(self):

class FakeContext(object):

    def __init__(self, project_id, quota_class):

    def elevated(self):

class FakeDriver(object):

    def __init__(self, by_project=None, by_class=None, reservations=None):

    def get_by_project(self, context, project_id, resource):

    def get_by_class(self, context, quota_class, resource):

    def get_default(self, context, resource):

    def get_defaults(self, context, resources):

    def get_class_quotas(self, context, resources, quota_class, defaults=True):

    def get_project_quotas(self, context, resources, project_id, quota_class=None, defaults=True, usages=True):

    def limit_check(self, context, resources, values, project_id=None):

    def reserve(self, context, resources, deltas, expire=None, project_id=None):

    def commit(self, context, reservations, project_id=None):

    def rollback(self, context, reservations, project_id=None):

    def destroy_all_by_project(self, context, project_id):

    def expire(self, context):

class BaseResourceTestCase(test.TestCase):

    def test_no_flag(self):

    def test_with_flag(self):

    def test_with_flag_no_quota(self):

    def test_quota_no_project_no_class(self):

    def test_quota_with_project_no_class(self):

    def test_quota_no_project_with_class(self):

    def test_quota_with_project_with_class(self):

    def test_quota_override_project_with_class(self):

    def test_quota_with_project_override_class(self):

class VolumeTypeResourceTestCase(test.TestCase):

    def test_name_and_flag(self):

class QuotaEngineTestCase(test.TestCase):

    def test_init(self):

    def test_init_override_string(self):

    def test_init_override_obj(self):

    def test_register_resource(self):

    def test_register_resources(self):

    def test_get_by_project(self):

    def test_get_by_class(self):

    def _make_quota_obj(self, driver):

    def test_get_defaults(self):

    def test_get_class_quotas(self):

    def test_get_project_quotas(self):

    def test_count_no_resource(self):

    def test_count_wrong_resource(self):

    def test_count(self):

        def fake_count(context, *args, **kwargs):

    def test_limit_check(self):

    def test_reserve(self):

    def test_commit(self):

    def test_rollback(self):

    def test_destroy_all_by_project(self):

    def test_expire(self):

    def test_resource_names(self):

class VolumeTypeQuotaEngineTestCase(test.TestCase):

    def test_default_resources(self):

        def fake_vtga(context, inactive=False, filters=None):

    def test_volume_type_resources(self):

        def fake_vtga(context, inactive=False, filters=None):

class DbQuotaDriverTestCase(test.TestCase):

    def setUp(self):

    def test_get_defaults(self):

    def _stub_quota_class_get_default(self):

        def fake_qcgd(context):

    def _stub_volume_type_get_all(self):

        def fake_vtga(context, inactive=False, filters=None):

    def _stub_quota_class_get_all_by_name(self):

        def fake_qcgabn(context, quota_class):

    def test_get_class_quotas(self):

    def test_get_class_quotas_no_defaults(self):

    def _stub_get_by_project(self):

        def fake_qgabp(context, project_id):

        def fake_qugabp(context, project_id):

    def test_get_project_quotas(self):

    def test_get_project_quotas_alt_context_no_class(self):

    def test_get_project_quotas_alt_context_with_class(self):

    def test_get_project_quotas_no_defaults(self):

    def test_get_project_quotas_no_usages(self):

    def _stub_get_project_quotas(self):

        def fake_get_project_quotas(context, resources, project_id, quota_class=None, defaults=True, usages=True):

    def test_get_quotas_has_sync_unknown(self):

    def test_get_quotas_no_sync_unknown(self):

    def test_get_quotas_has_sync_no_sync_resource(self):

    def test_get_quotas_no_sync_has_sync_resource(self):

    def test_get_quotas_has_sync(self):

    def _stub_quota_reserve(self):

        def fake_quota_reserve(context, resources, quotas, deltas, expire, until_refresh, max_age, project_id=None):

    def test_reserve_bad_expire(self):

    def test_reserve_default_expire(self):

    def test_reserve_int_expire(self):

    def test_reserve_timedelta_expire(self):

    def test_reserve_datetime_expire(self):

    def test_reserve_until_refresh(self):

    def test_reserve_max_age(self):

    def _stub_quota_destroy_all_by_project(self):

        def fake_quota_destroy_all_by_project(context, project_id):

    def test_destroy_by_project(self):

class FakeSession(object):

    def begin(self):

    def __enter__(self):

    def __exit__(self, exc_type, exc_value, exc_traceback):

class FakeUsage(sqa_models.QuotaUsage):

    def save(self, *args, **kwargs):

class QuotaReserveSqlAlchemyTestCase(test.TestCase):

    def setUp(self):

        def make_sync(res_name):

        def fake_get_session():

        def fake_get_quota_usages(context, session, project_id):

        def fake_quota_usage_create(context, project_id, resource, in_use, reserved, until_refresh, session=None, save=True):

        def fake_reservation_create(context, uuid, usage_id, project_id, resource, delta, expire, session=None):

    def _make_quota_usage(self, project_id, resource, in_use, reserved, until_refresh, created_at, updated_at):

    def init_usage(self, project_id, resource, in_use, reserved, until_refresh=None, created_at=None, updated_at=None):

    def compare_usage(self, usage_dict, expected):

    def _make_reservation(self, uuid, usage_id, project_id, resource, delta, expire, created_at, updated_at):

    def compare_reservation(self, reservations, expected):

    def test_quota_reserve_create_usages(self):

    def test_quota_reserve_negative_in_use(self):

    def test_quota_reserve_until_refresh(self):

    def test_quota_reserve_max_age(self):

    def test_quota_reserve_no_refresh(self):

    def test_quota_reserve_unders(self):

    def test_quota_reserve_overs(self):

    def test_quota_reserve_reduction(self):

\OpenStack\cinder-2014.1\cinder\tests\test_rbd.py

class MockException(Exception):

    def __init__(self, *args, **kwargs):

class MockImageNotFoundException(MockException):

class MockImageBusyException(MockException):

    def _common_inner_inner1(inst, *args, **kwargs):

        def _common_inner_inner2(mock_rados, mock_rbd, mock_client, mock_proxy):

class TestUtil(test.TestCase):

    def test_ascii_str(self):

class RBDTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def test_create_volume(self):

    def test_create_volume_no_layering(self):

    def test_delete_volume(self):

    def delete_volume_not_found(self):

    def test_delete_busy_volume(self):

    def test_create_snapshot(self):

    def test_delete_snapshot(self):

    def test_get_clone_info(self):

    def test_get_clone_info_w_snap(self):

    def test_get_clone_info_w_exception(self):

    def test_get_clone_info_deleted_volume(self):

    def test_create_cloned_volume(self):

    def test_create_cloned_volume_w_flatten(self):

    def test_create_cloned_volume_w_clone_exception(self):

    def test_good_locations(self):

    def test_bad_locations(self):

    def test_cloneable(self):

    def test_uncloneable_different_fsid(self):

    def test_uncloneable_unreadable(self):

    def test_uncloneable_bad_format(self):

    def _copy_image(self):

    def test_copy_image_no_volume_tmp(self):

    def test_copy_image_volume_tmp(self):

    def test_update_volume_stats(self):

    def test_update_volume_stats_error(self):

    def test_get_mon_addrs(self):

    def test_initialize_connection(self):

    def test_clone(self):

        def mock__enter__(inst):

    def test_extend_volume(self):

    def test_rbd_volume_proxy_init(self):

    def test_connect_to_rados(self):

class RBDImageIOWrapperTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def test_init(self):

    def test_inc_offset(self):

    def test_rbd_image(self):

    def test_rbd_user(self):

    def test_rbd_pool(self):

    def test_rbd_conf(self):

    def test_read(self):

        def mock_read(offset, length):

    def test_write(self):

    def test_seekable(self):

    def test_seek(self):

    def test_tell(self):

    def test_flush(self):

    def test_fileno(self):

    def test_close(self):

class ManagedRBDTestCase(DriverTestCase):

    def setUp(self):

    def _create_volume_from_image(self, expected_status, raw=False, clone_error=False):

    def test_create_vol_from_image_status_available(self):

        def _mock_clone_image(volume, image_location, image_id, image_meta):

    def test_create_vol_from_non_raw_image_status_available(self):

        def _mock_clone_image(volume, image_location, image_id, image_meta):

    def test_create_vol_from_image_status_error(self):

    def test_clone_failure(self):

    def test_clone_success(self):

\OpenStack\cinder-2014.1\cinder\tests\test_scality.py

class ScalityDriverTestCase(test.TestCase):

    def _makedirs(self, path):

    def _create_fake_config(self):

    def _create_fake_mount(self):

    def _remove_fake_config(self):

    def _configure_driver(self):

    def _execute_wrapper(self, cmd, *args, **kwargs):

    def _set_access_wrapper(self, is_visible):

        def _access_wrapper(path, flags):

    def setUp(self):

    def tearDown(self):

    def test_setup_no_config(self):

    def test_setup_missing_config(self):

    def test_setup_no_mount_helper(self):

    def test_setup_make_voldir(self):

    def test_local_path(self):

    def test_create_volume(self):

    def test_delete_volume(self):

    def test_create_snapshot(self):

    def test_delete_snapshot(self):

    def test_initialize_connection(self):

    def test_copy_image_to_volume(self):

    def test_copy_volume_to_image(self):

    def test_create_cloned_volume(self):

    def test_extend_volume(self):

\OpenStack\cinder-2014.1\cinder\tests\test_service.py

class FakeManager(manager.Manager):

    def __init__(self, host=None, db_driver=None, service_name=None):

    def test_method(self):

class ExtendedService(service.Service):

    def test_method(self):

class ServiceManagerTestCase(test.TestCase):

    def test_message_gets_to_manager(self):

    def test_override_manager_method(self):

class ServiceFlagsTestCase(test.TestCase):

    def test_service_enabled_on_create_based_on_flag(self):

    def test_service_disabled_on_create_based_on_flag(self):

class ServiceTestCase(test.TestCase):

    def setUp(self):

    def test_create(self):

    def test_report_state_newly_disconnected(self):

    def test_report_state_newly_connected(self):

    def test_service_with_long_report_interval(self):

class TestWSGIService(test.TestCase):

    def setUp(self):

    def test_service_random_port(self):

class OSCompatibilityTestCase(test.TestCase):

    def _test_service_launcher(self, fake_os):

    def test_process_launcher_on_windows(self):

    def test_process_launcher_on_linux(self):

\OpenStack\cinder-2014.1\cinder\tests\test_sheepdog.py

class FakeImageService:

    def download(self, context, image_id, path):

class SheepdogTestCase(test.TestCase):

    def setUp(self):

    def test_update_volume_stats(self):

        def fake_stats(*args):

    def test_update_volume_stats_error(self):

        def fake_stats(*args):

    def test_check_for_setup_error_0_5(self):

        def fake_stats(*args):

    def test_check_for_setup_error_0_6(self):

        def fake_stats(*args):

    def test_copy_image_to_volume(self):

        def fake_temp_file(dir):

        def fake_try_execute(obj, *command, **kwargs):

    def test_extend_volume(self):

\OpenStack\cinder-2014.1\cinder\tests\test_solidfire.py

def create_configuration():

class SolidFireVolumeTestCase(test.TestCase):

    def setUp(self):

    def fake_issue_api_request(obj, method, params, version='1.0'):

    def fake_issue_api_request_fails(obj, method, params, version='1.0'):

    def fake_set_qos_by_volume_type(self, type_id, ctxt):

    def fake_volume_get(obj, key, default=None):

    def fake_update_cluster_status(self):

    def fake_get_model_info(self, account, vid):

    def test_create_with_qos_type(self):

    def test_create_volume(self):

    def test_create_volume_non_512(self):

    def test_create_snapshot(self):

    def test_create_clone(self):

    def test_initialize_connector_with_blocksizes(self):

    def test_create_volume_with_qos(self):

    def test_create_volume_fails(self):

    def test_create_sfaccount(self):

    def test_create_sfaccount_fails(self):

    def test_get_sfaccount_by_name(self):

    def test_get_sfaccount_by_name_fails(self):

    def test_delete_volume(self):

    def test_delete_volume_fails_no_volume(self):

    def test_delete_volume_fails_account_lookup(self):

    def test_get_cluster_info(self):

    def test_get_cluster_info_fail(self):

    def test_extend_volume(self):

    def test_extend_volume_fails_no_volume(self):

    def test_extend_volume_fails_account_lookup(self):

    def test_set_by_qos_spec_with_scoping(self):

    def test_set_by_qos_spec(self):

    def test_set_by_qos_by_type_only(self):

    def test_retype(self):

    def test_retype_with_qos_spec(self):

        def _fake_get_volume_type(ctxt, type_id):

        def _fake_get_qos_spec(ctxt, spec_id):

\OpenStack\cinder-2014.1\cinder\tests\test_storwize_svc.py

class StorwizeSVCManagementSimulator:

    def __init__(self, pool_name):

    def _state_transition(self, function, fcmap):

    def _find_unused_id(d):

    def _is_invalid_name(name):

    def _cmd_to_dict(arg_list):

    def _print_info_cmd(rows, delim=' ', nohdr=False, **kwargs):

    def _print_info_obj_cmd(header, row, delim=' ', nohdr=False):

    def _convert_bytes_units(bytestr):

    def _convert_units_bytes(num, unit):

    def _cmd_lslicense(self, **kwargs):

    def _cmd_lssystem(self, **kwargs):

    def _cmd_lsmdiskgrp(self, **kwargs):

    def _cmd_lsnodecanister(self, **kwargs):

    def _cmd_lsnode(self, **kwargs):

    def _cmd_lsportip(self, **kwargs):

    def _cmd_lsfabric(self, **kwargs):

    def _cmd_mkvdisk(self, **kwargs):

    def _cmd_rmvdisk(self, **kwargs):

    def _cmd_expandvdisksize(self, **kwargs):

    def _get_fcmap_info(self, vol_name):

    def _cmd_lsvdisk(self, **kwargs):

    def _cmd_lsiogrp(self, **kwargs):

    def _add_port_to_host(self, host_info, **kwargs):

    def _cmd_mkhost(self, **kwargs):

    def _cmd_addhostport(self, **kwargs):

    def _cmd_chhost(self, **kwargs):

    def _cmd_rmhost(self, **kwargs):

    def _cmd_lshost(self, **kwargs):

    def _cmd_lsiscsiauth(self, **kwargs):

    def _cmd_mkvdiskhostmap(self, **kwargs):

    def _cmd_rmvdiskhostmap(self, **kwargs):

    def _cmd_lshostvdiskmap(self, **kwargs):

    def _cmd_lsvdiskhostmap(self, **kwargs):

    def _cmd_mkfcmap(self, **kwargs):

    def _cmd_prestartfcmap(self, **kwargs):

    def _cmd_startfcmap(self, **kwargs):

    def _cmd_stopfcmap(self, **kwargs):

    def _cmd_rmfcmap(self, **kwargs):

    def _cmd_lsvdiskfcmappings(self, **kwargs):

    def _cmd_chfcmap(self, **kwargs):

    def _cmd_lsfcmap(self, **kwargs):

    def _cmd_migratevdisk(self, **kwargs):

    def _cmd_addvdiskcopy(self, **kwargs):

    def _cmd_lsvdiskcopy(self, **kwargs):

    def _cmd_rmvdiskcopy(self, **kwargs):

    def _cmd_chvdisk(self, **kwargs):

    def _cmd_movevdisk(self, **kwargs):

    def _cmd_addvdiskaccess(self, **kwargs):

    def _cmd_rmvdiskaccess(self, **kwargs):

    def _add_host_to_list(self, connector):

    def _host_in_list(self, host_name):

    def execute_command(self, cmd, check_exit_code=True):

    def error_injection(self, cmd, error):

class StorwizeSVCFakeDriver(storwize_svc.StorwizeSVCDriver):

    def __init__(self, *args, **kwargs):

    def set_fake_storage(self, fake):

    def _run_ssh(self, cmd, check_exit_code=True, attempts=1):

class StorwizeSVCDriverTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def _set_flag(self, flag, value):

    def _reset_flags(self):

    def _assert_vol_exists(self, name, exists):

    def test_storwize_svc_connectivity(self):

    def _generate_vol_info(self, vol_name, vol_id):

    def _create_volume(self, **kwargs):

    def _delete_volume(self, volume):

    def _create_test_vol(self, opts):

    def test_storwize_svc_snapshots(self):

    def test_storwize_svc_create_volfromsnap_clone(self):

    def test_storwize_svc_volumes(self):

    def test_storwize_svc_volume_params(self):

    def test_storwize_svc_unicode_host_and_volume_names(self):

    def test_storwize_svc_validate_connector(self):

    def test_storwize_svc_host_maps(self):

    def test_storwize_svc_multi_host_maps(self):

    def test_storwize_svc_delete_volume_snapshots(self):

    def assertLessEqual(self, a, b, msg=None):

    def test_storwize_svc_get_volume_stats(self):

    def test_storwize_svc_extend_volume(self):

    def _check_loc_info(self, capabilities, expected):

    def test_storwize_svc_migrate_bad_loc_info(self):

    def test_storwize_svc_volume_migrate(self):

    def test_storwize_svc_retype_no_copy(self):

    def test_storwize_svc_retype_only_change_iogrp(self):

    def test_storwize_svc_retype_need_copy(self):

    def test_set_storage_code_level_success(self):

    def test_storwize_vdisk_copy_ops(self):

    def test_storwize_initiator_target_map(self):

    def _get_vdisk_uid(self, vdisk_name):

    def _create_volume_and_return_uid(self, volume_name):

    def test_manage_existing_bad_ref(self):

    def test_manage_existing_bad_uid(self):

    def test_manage_existing_good_uid_not_mapped(self):

    def test_manage_existing_good_uid_mapped(self):

    def test_manage_existing_good_uid_mapped_with_override(self):

class CLIResponseTestCase(test.TestCase):

    def test_empty(self):

    def test_header(self):

    def test_select(self):

    def test_lsnode_all(self):

    def test_lsnode_single(self):

class StorwizeHelpersTestCase(test.TestCase):

    def setUp(self):

    def test_compression_enabled(self):

\OpenStack\cinder-2014.1\cinder\tests\test_test.py

class IsolationTestCase(test.TestCase):

    def test_service_isolation(self):

    def test_rpc_consumer_isolation(self):

\OpenStack\cinder-2014.1\cinder\tests\test_test_utils.py

class TestUtilsTestCase(test.TestCase):

    def test_get_test_admin_context(self):

\OpenStack\cinder-2014.1\cinder\tests\test_utils.py

def _get_local_mock_open(fake_data='abcd efgh'):

class ExecuteTestCase(test.TestCase):

    def test_retry_on_failure(self):

    def test_unknown_kwargs_raises_error(self):

    def test_check_exit_code_boolean(self):

    def test_no_retry_on_success(self):

class GetFromPathTestCase(test.TestCase):

    def test_tolerates_nones(self):

    def test_does_select(self):

    def test_flattens_lists(self):

    def test_bad_xpath(self):

    def test_real_failure1(self):

    def test_accepts_dictionaries(self):

class GenericUtilsTestCase(test.TestCase):

    def test_find_config(self, mock_exists):

    def test_as_int(self):

    def test_check_exclusive_options(self):

    def test_require_driver_intialized(self):

    def test_hostname_unicode_sanitization(self):

    def test_hostname_sanitize_periods(self):

    def test_hostname_sanitize_dashes(self):

    def test_hostname_sanitize_characters(self):

    def test_hostname_translate(self):

    def test_generate_glance_url(self):

    def test_read_cached_file(self, mock_mtime, mock_open):

    def test_read_modified_cached_file(self, mock_mtime, mock_open):

    def test_generate_password(self):

    def test_read_file_as_root(self):

        def fake_execute(*args, **kwargs):

    def test_temporary_chown(self):

        def fake_execute(*args, **kwargs):

    def test_service_is_up(self, mock_utcnow):

    def test_safe_parse_xml(self):

        def killer_body():

    def test_xhtml_escape(self):

    def test_hash_file(self):

    def test_check_ssh_injection(self):

    def test_check_ssh_injection_on_error(self):

    def test_create_channel(self, mock_client):

    def test_get_file_mode(self, mock_stat):

    def test_get_file_gid(self, mock_stat):

class MonkeyPatchTestCase(test.TestCase):

    def setUp(self):

    def test_monkey_patch(self):

class AuditPeriodTest(test.TestCase):

    def setUp(self):

    def test_hour(self):

    def test_hour_with_offset_before_current(self):

    def test_hour_with_offset_after_current(self):

    def test_day(self):

    def test_day_with_offset_before_current(self):

    def test_day_with_offset_after_current(self):

    def test_month(self):

    def test_month_with_offset_before_current(self):

    def test_month_with_offset_after_current(self):

    def test_year(self):

    def test_year_with_offset_before_current(self):

    def test_year_with_offset_after_current(self):

class FakeSSHClient(object):

    def __init__(self):

    def set_missing_host_key_policy(self, policy):

    def connect(self, ip, port=22, username=None, password=None, pkey=None, timeout=10):

    def get_transport(self):

    def close(self):

    def __call__(self, *args, **kwargs):

class FakeSock(object):

    def settimeout(self, timeout):

class FakeTransport(object):

    def __init__(self):

    def set_keepalive(self, timeout):

    def is_active(self):

class SSHPoolTestCase(test.TestCase):

    def test_single_ssh_connect(self, mock_sshclient, mock_pkey):

    def test_closed_reopend_ssh_connections(self, mock_sshclient):

class BrickUtils(test.TestCase):

    def test_brick_get_connector_properties(self):

    def test_brick_get_connector(self):

class StringLengthTestCase(test.TestCase):

    def test_check_string_length(self):

\OpenStack\cinder-2014.1\cinder\tests\test_vmware_api.py

class RetryTest(test.TestCase):

    def test_retry(self):

        def func(*args, **kwargs):

        def func2(*args, **kwargs):

    def test_retry_with_expected_exceptions(self):

        def func(*args, **kwargs):

    def test_retry_with_max_retries(self):

        def func(*args, **kwargs):

    def test_retry_with_unexpected_exception(self):

        def func(*args, **kwargs):

class VMwareAPISessionTest(test.TestCase):

    def setUp(self):

    def _create_api_session(self, _create_session, retry_count=10, task_poll_interval=1):

    def test_create_session(self):

    def test_create_session_with_existing_session(self):

    def test_invoke_api(self):

        def api(*args, **kwargs):

    def test_invoke_api_with_expected_exception(self):

        def api(*args, **kwargs):

    def test_invoke_api_with_vim_fault_exception(self):

        def api(*args, **kwargs):

    def test_invoke_api_with_empty_response(self):

        def api(*args, **kwargs):

    def test_invoke_api_with_stale_session(self):

        def api(*args, **kwargs):

\OpenStack\cinder-2014.1\cinder\tests\test_vmware_vmdk.py

class FakeVim(object):

    def service_content(self):

    def client(self):

    def Login(self, session_manager, userName, password):

    def Logout(self, session_manager):

    def TerminateSession(self, session_manager, sessionId):

    def SessionIsActive(self, session_manager, sessionID, userName):

class FakeTaskInfo(object):

    def __init__(self, state, result=None):

class FakeMor(object):

    def __init__(self, type, val):

class FakeObject(object):

    def __init__(self):

    def __setitem__(self, key, value):

    def __getitem__(self, item):

class FakeManagedObjectReference(object):

    def __init__(self, lis=[]):

class FakeDatastoreSummary(object):

    def __init__(self, freeSpace, capacity, datastore=None, name=None):

class FakeSnapshotTree(object):

    def __init__(self, tree=None, name=None, snapshot=None, childSnapshotList=None):

class FakeElem(object):

    def __init__(self, prop_set=None):

class FakeProp(object):

    def __init__(self, name=None, val=None):

class FakeRetrieveResult(object):

    def __init__(self, objects, token):

class FakeObj(object):

    def __init__(self, obj=None):

class VMwareEsxVmdkDriverTestCase(test.TestCase):

    def setUp(self):

    def test_retry(self):

    def test_create_session(self):

    def test_do_setup(self):

    def test_check_for_setup_error(self):

    def test_get_volume_stats(self):

    def test_create_volume(self):

    def test_success_wait_for_task(self):

    def test_failed_wait_for_task(self):

    def test_delete_volume_without_backing(self):

    def test_delete_volume_with_backing(self):

    def test_create_export(self):

    def test_ensure_export(self):

    def test_remove_export(self):

    def test_terminate_connection(self):

    def test_create_backing_in_inventory_multi_hosts(self):

    def test_init_conn_with_instance_and_backing(self):

    def test_get_volume_group_folder(self):

    def test_select_datastore_summary(self):

    def test_get_folder_ds_summary(self, volumeops, session):

    def test_get_disk_type(self):

    def test_init_conn_with_instance_no_backing(self):

    def test_init_conn_without_instance(self):

    def test_create_snapshot_without_backing(self):

    def test_create_snapshot_with_backing(self):

    def test_create_snapshot_when_attached(self):

    def test_delete_snapshot_without_backing(self):

    def test_delete_snapshot_with_backing(self):

    def test_delete_snapshot_when_attached(self):

    def test_create_cloned_volume_without_backing(self, mock_vops):

    def test_create_cloned_volume_with_backing(self, mock_vops):

    def test_create_backing_by_copying(self, volumeops, create_backing, _extend_virtual_disk):

    def _test_create_backing_by_copying(self, volumeops, create_backing, _extend_virtual_disk):

    def test_create_volume_from_snapshot_without_backing(self, mock_vops):

    def test_create_volume_from_snap_without_backing_snap(self, mock_vops):

    def test_create_volume_from_snapshot(self, mock_vops):

    def test_extend_volume(self, volume_ops, _extend_virtual_disk, _select_ds_for_volume):

    def _test_extend_volume(self, volume_ops, _extend_virtual_disk, _select_ds_for_volume):

    def test_copy_image_to_volume_non_vmdk(self):

    def test_copy_image_to_volume_vmdk(self, volume_ops, session, _create_backing_in_inventory, _get_ds_name_flat_vmdk_path, _extend_vmdk_virtual_disk, fetch_flat_image):

    def _test_copy_image_to_volume_vmdk(self, volume_ops, session, _create_backing_in_inventory, _get_ds_name_flat_vmdk_path, _extend_vmdk_virtual_disk, fetch_flat_image):

    def test_copy_image_to_volume_stream_optimized(self, volumeops, session, _select_ds_for_volume, _extend_virtual_disk, fetch_optimized_image):

    def _test_copy_image_to_volume_stream_optimized(self, volumeops, session, _select_ds_for_volume, _extend_virtual_disk, fetch_optimized_image):

    def test_copy_volume_to_image_non_vmdk(self):

    def test_copy_volume_to_image_when_attached(self):

    def test_copy_volume_to_image_vmdk(self):

    def test_retrieve_properties_ex_fault_checker(self):

    def test_extend_vmdk_virtual_disk(self, volume_ops):

    def _test_extend_vmdk_virtual_disk(self, volume_ops):

class VMwareVcVmdkDriverTestCase(VMwareEsxVmdkDriverTestCase):

    def setUp(self):

    def test_get_pbm_wsdl_location(self):

        def expected_wsdl(version):

    def test_get_vc_version(self, session):

    def test_do_setup(self, session, _get_vc_version, _get_pbm_wsdl_location):

    def test_create_backing_by_copying(self, volumeops, create_backing, extend_virtual_disk):

    def test_init_conn_with_instance_and_backing(self):

    def test_get_volume_group_folder(self):

    def test_init_conn_with_instance_and_backing_and_relocation(self):

    def test_clone_backing_linked(self, volume_ops, _extend_vmdk_virtual_disk):

    def test_clone_backing_full(self, volume_ops, _select_ds_for_volume, _extend_vmdk_virtual_disk):

    def test_create_volume_from_snapshot_without_backing(self, mock_vops):

    def test_create_volume_from_snap_without_backing_snap(self, mock_vops):

    def test_create_volume_from_snapshot(self, mock_vops):

    def test_create_cloned_volume_without_backing(self, mock_vops):

    def test_create_cloned_volume_with_backing(self, mock_vops):

    def test_create_linked_cloned_volume_with_backing(self, get_clone_type, mock_vops):

    def test_create_linked_cloned_volume_when_attached(self, get_clone_type, mock_vops):

    def test_get_storage_profile(self, get_volume_type_extra_specs):

    def test_filter_ds_by_profile(self, volumeops, session, hubs_to_ds, ds_to_hubs):

    def test_get_folder_ds_summary(self, volumeops, session):

        def filter_ds(datastores, storage_profile):

    def test_extend_vmdk_virtual_disk(self, volume_ops):

    def test_copy_image_to_volume_vmdk(self, volume_ops, session, _create_backing_in_inventory, _get_ds_name_flat_vmdk_path, _extend_vmdk_virtual_disk, fetch_flat_image):

    def test_copy_image_to_volume_stream_optimized(self, volumeops, session, _select_ds_for_volume, _extend_virtual_disk, fetch_optimized_image):

    def test_extend_volume(self, volume_ops, _extend_virtual_disk, _select_ds_for_volume):

\OpenStack\cinder-2014.1\cinder\tests\test_vmware_volumeops.py

class VolumeOpsTestCase(test.TestCase):

    def setUp(self):

    def test_split_datastore_path(self):

    def vm(self, val):

    def test_get_backing(self):

    def test_delete_backing(self):

    def test_get_host(self):

    def test_get_hosts(self):

    def test_continue_retrieval(self):

    def test_cancel_retrieval(self):

    def test_is_usable(self):

    def _create_host_mounts(self, access_mode, host, set_accessible=True, is_accessible=True, mounted=True):

    def test_get_connected_hosts(self):

    def test_is_valid(self):

        def _is_valid(host_mounts, is_valid):

    def test_get_dss_rp(self):

    def test_get_parent(self):

    def test_get_dc(self):

        def mock_invoke_api(vim_util, method, vim, the_object, arg):

    def test_get_vmfolder(self):

    def test_create_folder_not_present(self):

    def test_create_folder_already_present(self):

    def test_get_create_spec(self):

    def test_create_backing(self, get_create_spec):

    def test_get_datastore(self):

    def test_get_summary(self):

    def test_get_relocate_spec(self):

    def test_relocate_backing(self, get_relocate_spec):

    def test_move_backing_to_folder(self):

    def test_create_snapshot_operation(self):

    def test_get_snapshot_from_tree(self):

    def test_get_snapshot(self):

    def test_delete_snapshot(self):

    def test_get_folder(self):

    def test_get_clone_spec(self):

    def test_clone_backing(self, get_clone_spec):

    def test_delete_file(self):

    def test_get_path_name(self):

    def test_get_entity_name(self):

    def test_get_vmdk_path(self):

    def test_copy_vmdk_file(self):

    def test_delete_vmdk_file(self):

    def test_extend_virtual_disk(self):

\OpenStack\cinder-2014.1\cinder\tests\test_volume.py

class FakeImageService:

    def __init__(self, db_driver=None, image_service=None):

    def show(self, context, image_id):

class BaseVolumeTestCase(test.TestCase):

    def setUp(self):

    def tearDown(self):

    def fake_get_target(obj, iqn):

    def fake_get_all_volume_groups(obj, vg_name=None, no_suffix=True):

class VolumeTestCase(BaseVolumeTestCase):

    def setUp(self):

    def test_init_host_clears_downloads(self):

    def test_create_driver_not_initialized(self, reserve, commit, rollback):

        def fake_reserve(context, expire=None, project_id=None, **deltas):

        def fake_commit_and_rollback(context, reservations, project_id=None):

    def test_delete_driver_not_initialized(self, reserve, commit, rollback):

OpenStack Index

Previous

Next