@

Home 

OpenStack Study: nova

OpenStack Index

Previous

Next

    def test_inject_auto_disk_config_when_present(self):

    def test_inject_auto_disk_config_none_as_false(self):

class GetConsoleOutputTestCase(VMOpsTestBase):

    def setUp(self):

    def test_get_console_output_works(self):

    def test_get_console_output_throws_nova_exception(self):

    def test_get_dom_id_works(self):

    def test_get_dom_id_works_with_rescue_vm(self):

    def test_get_dom_id_raises_not_found(self):

    def test_get_dom_id_works_with_vmref(self):

class SpawnTestCase(VMOpsTestBase):

    def _stub_out_common(self):

    def _test_spawn(self, name_label_param=None, block_device_info_param=None, rescue=False, include_root_vdi=True, throw_exception=None, attach_pci_dev=False):

    def test_spawn(self):

    def test_spawn_with_alternate_options(self):

    def test_spawn_with_pci_available_on_the_host(self):

    def test_spawn_performs_rollback_and_throws_exception(self):

    def _test_finish_migration(self, power_on=True, resize_instance=True, throw_exception=None):

    def test_finish_migration(self):

    def test_finish_migration_no_power_on(self):

    def test_finish_migrate_performs_rollback_on_error(self):

    def test_remove_hostname(self):

    def test_reset_network(self):

    def test_inject_hostname(self):

    def test_inject_hostname_with_rescue_prefix(self):

    def test_inject_hostname_with_windows_name_truncation(self):

    def test_wait_for_instance_to_start(self):

    def test_attach_orig_disk_for_rescue(self):

    def test_agent_update_setup(self):

class MigrateDiskAndPowerOffTestCase(VMOpsTestBase):

    def test_migrate_disk_and_power_off_works_down(self, migrate_up, migrate_down, *mocks):

    def test_migrate_disk_and_power_off_works_up(self, migrate_up, migrate_down, *mocks):

    def test_migrate_disk_and_power_off_resize_down_ephemeral_fails(self, migrate_up, migrate_down, *mocks):

class MigrateDiskResizingUpTestCase(VMOpsTestBase):

    def _fake_snapshot_attached_here(self, session, instance, vm_ref, label, userdevice, post_snapshot_callback):

    def test_migrate_disk_resizing_up_works_no_ephemeral(self, mock_apply_orig, mock_update_progress, mock_get_all_vdi_uuids, mock_shutdown, mock_migrate_vhd):

    def test_migrate_disk_resizing_up_works_with_two_ephemeral(self, mock_apply_orig, mock_update_progress, mock_get_all_vdi_uuids, mock_shutdown, mock_migrate_vhd):

    def test_migrate_disk_resizing_up_rollback(self, mock_restore, mock_apply_orig, mock_update_progress, mock_get_all_vdi_uuids, mock_shutdown, mock_migrate_vhd):

class CreateVMRecordTestCase(VMOpsTestBase):

    def test_create_vm_record_with_vm_device_id(self, mock_create_vm, mock_get_vm_device_id, mock_determine_vm_mode):

class BootableTestCase(VMOpsTestBase):

    def setUp(self):

    def _get_blocked(self):

    def test_acquire_bootlock(self):

    def test_release_bootlock(self):

    def test_set_bootable(self):

    def test_set_not_bootable(self):

class ResizeVdisTestCase(VMOpsTestBase):

    def test_resize_up_vdis_root(self, mock_resize):

    def test_resize_up_vdis_zero_disks(self, mock_resize):

    def test_resize_up_vdis_no_vdis_like_initial_spawn(self, mock_resize):

    def test_resize_up_vdis_ephemeral(self, mock_sizes, mock_resize):

    def test_resize_up_vdis_ephemeral_with_generate(self, mock_sizes, mock_generate, mock_resize):

class MigrateDiskResizingDownTestCase(VMOpsTestBase):

    def test_migrate_disk_resizing_down_works_no_ephemeral( self, mock_destroy_vdi, mock_migrate_vhd, mock_resize_disk, mock_get_vdi_for_vm_safely, mock_update_instance_progress, mock_apply_orig_vm_name_label, mock_resize_ensure_vm_is_shutdown):

\OpenStack\nova-2014.1\nova\tests\virt\xenapi\test_vm_utils.py

def get_fake_connection_data(sr_type):

def _get_fake_session(error=None):

def contextified(result):

def _fake_noop(*args, **kwargs):

class VMUtilsTestBase(stubs.XenAPITestBaseNoDB):

class LookupTestCase(VMUtilsTestBase):

    def setUp(self):

    def _do_mock(self, result):

    def test_normal(self):

    def test_no_result(self):

    def test_too_many(self):

    def test_rescue_none(self):

    def test_rescue_found(self):

    def test_rescue_too_many(self):

class GenerateConfigDriveTestCase(VMUtilsTestBase):

    def test_no_admin_pass(self):

    def test_vdi_cleaned_up(self, mock_find, mock_create_vdi, mock_attached, mock_destroy):

class XenAPIGetUUID(VMUtilsTestBase):

    def test_get_this_vm_uuid_new_kernel(self):

    def test_get_this_vm_uuid_old_kernel_reboot(self):

class FakeSession(object):

    def call_xenapi(self, *args):

    def call_plugin(self, *args):

    def call_plugin_serialized(self, plugin, fn, *args, **kwargs):

    def call_plugin_serialized_with_retry(self, plugin, fn, num_retries, callback, *args, **kwargs):

class FetchVhdImageTestCase(VMUtilsTestBase):

    def setUp(self):

    def _stub_glance_download_vhd(self, raise_exc=None):

    def _stub_bittorrent_download_vhd(self, raise_exc=None):

    def test_fetch_vhd_image_works_with_glance(self):

    def test_fetch_vhd_image_works_with_bittorrent(self):

    def test_fetch_vhd_image_cleans_up_vdi_on_fail(self):

    def test_fallback_to_default_handler(self):

    def test_default_handler_doesnt_fallback_to_itself(self):

class TestImageCompression(VMUtilsTestBase):

    def test_image_compression(self):

class ResizeHelpersTestCase(VMUtilsTestBase):

    def test_repair_filesystem(self):

    def _call_tune2fs_remove_journal(self, path):

    def _call_tune2fs_add_journal(self, path):

    def _call_parted(self, path, start, end):

    def test_resize_part_and_fs_down_succeeds(self):

    def test_log_progress_if_required(self):

    def test_log_progress_if_not_required(self):

    def test_resize_part_and_fs_down_fails_disk_too_big(self):

    def test_resize_part_and_fs_up_succeeds(self):

    def test_resize_disk_throws_on_zero_size(self):

    def test_auto_config_disk_returns_early_on_zero_size(self):

class CheckVDISizeTestCase(VMUtilsTestBase):

    def setUp(self):

    def test_not_too_large(self):

    def test_too_large(self):

    def test_zero_root_gb_disables_check(self):

class GetInstanceForVdisForSrTestCase(VMUtilsTestBase):

    def setUp(self):

    def test_get_instance_vdis_for_sr(self):

    def test_get_instance_vdis_for_sr_no_vbd(self):

    def test_get_vdi_uuid_for_volume_with_sr_uuid(self):

    def test_get_vdi_uuid_for_volume_failure(self):

        def bad_introduce_sr(session, sr_uuid, label, sr_params):

    def test_get_vdi_uuid_for_volume_from_iscsi_vol_missing_sr_uuid(self):

class VMRefOrRaiseVMFoundTestCase(VMUtilsTestBase):

    def test_lookup_call(self):

    def test_return_value(self):

class VMRefOrRaiseVMNotFoundTestCase(VMUtilsTestBase):

    def test_exception_raised(self):

    def test_exception_msg_contains_vm_name(self):

class BittorrentTestCase(VMUtilsTestBase):

    def setUp(self):

    def test_image_uses_bittorrent(self):

    def _test_create_image(self, cache_type):

        def fake_create_cached_image(*args):

        def fake_fetch_image(*args):

    def test_create_image_cached(self):

    def test_create_image_uncached(self):

class ShutdownTestCase(VMUtilsTestBase):

    def test_hardshutdown_should_return_true_when_vm_is_shutdown(self):

    def test_cleanshutdown_should_return_true_when_vm_is_shutdown(self):

class CreateVBDTestCase(VMUtilsTestBase):

    def setUp(self):

    def _generate_vbd_rec(self):

    def test_create_vbd_default_args(self):

    def test_create_vbd_osvol(self):

    def test_create_vbd_extra_args(self):

    def test_attach_cd(self):

class UnplugVbdTestCase(VMUtilsTestBase):

    def test_unplug_vbd_works(self, mock_sleep):

    def test_unplug_vbd_raises_unexpected_error(self):

    def test_unplug_vbd_already_detached_works(self):

    def test_unplug_vbd_already_raises_unexpected_xenapi_error(self):

    def _test_uplug_vbd_retries(self, mock_sleep, error):

    def test_uplug_vbd_retries_on_rejected(self, mock_sleep):

    def test_uplug_vbd_retries_on_internal_error(self, mock_sleep):

class VDIOtherConfigTestCase(VMUtilsTestBase):

    def setUp(self):

    def test_create_vdi(self):

    def test_create_image(self):

        def fake_fetch_image(*args):

        def VDI_add_to_other_config(ref, key, value):

    def test_import_migrated_vhds(self):

        def VDI_add_to_other_config(ref, key, value):

        def call_plugin_serialized(*args, **kwargs):

class GenerateDiskTestCase(VMUtilsTestBase):

    def setUp(self):

    def tearDown(self):

    def _expect_parted_calls(self):

    def _check_vdi(self, vdi_ref, check_attached=True):

    def test_generate_disk_with_no_fs_given(self):

    def test_generate_disk_swap(self):

    def test_generate_disk_ephemeral(self):

    def test_generate_disk_ensure_cleanup_called(self):

    def test_generate_disk_ephemeral_local_not_attached(self):

class GenerateEphemeralTestCase(VMUtilsTestBase):

    def setUp(self):

    def test_get_ephemeral_disk_sizes_simple(self):

    def test_get_ephemeral_disk_sizes_three_disks_2000(self):

    def test_get_ephemeral_disk_sizes_two_disks_1024(self):

    def _expect_generate_disk(self, size, device, name_label):

    def test_generate_ephemeral_adds_one_disk(self):

    def test_generate_ephemeral_adds_multiple_disks(self):

    def test_generate_ephemeral_cleans_up_on_error(self):

class FakeFile(object):

    def __init__(self):

    def seek(self, offset):

class StreamDiskTestCase(VMUtilsTestBase):

    def setUp(self):

    def test_non_ami(self):

    def test_ami_disk(self):

class VMUtilsSRPath(VMUtilsTestBase):

    def setUp(self):

    def test_defined(self):

    def test_default(self):

class CreateKernelRamdiskTestCase(VMUtilsTestBase):

    def setUp(self):

    def test_create_kernel_and_ramdisk_no_create(self):

    def test_create_kernel_and_ramdisk_create_both_cached(self):

    def test_create_kernel_and_ramdisk_create_kernel_not_cached(self):

class ScanSrTestCase(VMUtilsTestBase):

    def test_scan_default_sr(self, mock_safe_find_sr, mock_scan_sr):

    def test_scan_sr_works(self):

    def test_scan_sr_unknown_error_fails_once(self):

    def test_scan_sr_known_error_retries_then_throws(self, mock_sleep):

    def test_scan_sr_known_error_retries_then_succeeds(self, mock_sleep):

        def fake_call_xenapi(*args):

class CreateVmTestCase(VMUtilsTestBase):

    def test_vss_provider(self, mock_extract):

    def test_invalid_cpu_mask_raises(self, mock_extract):

    def test_destroy_vm(self, mock_extract):

    def test_destroy_vm_silently_fails(self, mock_extract):

class DetermineVmModeTestCase(VMUtilsTestBase):

    def test_determine_vm_mode_returns_xen_mode(self):

    def test_determine_vm_mode_returns_hvm_mode(self):

    def test_determine_vm_mode_returns_xen_for_linux(self):

    def test_determine_vm_mode_returns_hvm_for_windows(self):

    def test_determine_vm_mode_returns_hvm_by_default(self):

    def test_determine_vm_mode_returns_xen_for_VHD(self):

    def test_determine_vm_mode_returns_xen_for_DISK(self):

class CallXenAPIHelpersTestCase(VMUtilsTestBase):

    def test_vm_get_vbd_refs(self):

    def test_vbd_get_rec(self):

    def test_vdi_get_rec(self):

    def test_vdi_snapshot(self):

    def test_vdi_get_virtual_size(self):

    def test_vdi_resize(self, mock_get_resize_func_name):

    def test_update_vdi_virtual_size_works(self, mock_get_size, mock_resize):

    def test_update_vdi_virtual_size_skips_resize_down(self, mock_get_size, mock_resize):

    def test_update_vdi_virtual_size_raise_if_disk_big(self, mock_get_size, mock_resize):

class GetVdiForVMTestCase(VMUtilsTestBase):

    def test_get_vdi_for_vm_safely(self, vm_get_vbd_refs, vbd_get_rec, vdi_get_rec):

    def test_get_vdi_for_vm_safely_fails(self, vm_get_vbd_refs, vbd_get_rec, vdi_get_rec):

class GetAllVdiForVMTestCase(VMUtilsTestBase):

    def _setup_get_all_vdi_uuids_for_vm(self, vm_get_vbd_refs, vbd_get_rec, vdi_get_uuid):

        def fake_vbd_get_rec(session, vbd_ref):

        def fake_vdi_get_uuid(session, vdi_ref):

    def test_get_all_vdi_uuids_for_vm_works(self, vm_get_vbd_refs, vbd_get_rec, vdi_get_uuid):

    def test_get_all_vdi_uuids_for_vm_finds_none(self, vm_get_vbd_refs, vbd_get_rec, vdi_get_uuid):

class GetAllVdisTestCase(VMUtilsTestBase):

    def test_get_all_vdis_in_sr(self):

        def fake_get_rec(record_type, ref):

class VDIAttachedHere(VMUtilsTestBase):

    def test_sync_called(self, mock_execute, mock_wait_for_device, mock_remap_vbd_dev, mock_create_vbd, mock_get_this_vm_ref, mock_destroy_vbd):

class SnapshotAttachedHereTestCase(VMUtilsTestBase):

    def test_snapshot_attached_here(self, mock_impl):

        def fake_impl(session, instance, vm_ref, label, userdevice, post_snapshot_callback):

    def test_snapshot_attached_here_impl(self, mock_get_vdi_for_vm_safely, mock_get_vhd_parent_uuid, mock_vdi_snapshot, mock_vdi_get_uuid, mock_wait_for_vhd_coalesce, mock_walk_vdi_chain, mock_safe_destroy_vdis):

    def test_wait_for_vhd_coalesce(self, mock_scan_sr, mock_another_child_vhd, mock_get_vhd_parent_uuid, mock_sleep):

        def fake_scan_sr(session, sr_ref):

        def fake_get_vhd_parent_uuid(session, vdi_ref):

        def fake_call_xenapi(method, *args):

class ImportMigratedDisksTestCase(VMUtilsTestBase):

    def test_import_all_migrated_disks(self, mock_root, mock_ephemeral):

    def test_import_migrated_root_disk(self, mock_migrate):

    def test_import_migrate_ephemeral_disks(self, mock_migrate):

    def test_import_migrated_vhds(self, mock_get_sr_path, mock_scan_sr, mock_set_info):

    def test_get_vhd_parent_uuid_rec_provided(self):

class MigrateVHDTestCase(VMUtilsTestBase):

    def _assert_transfer_called(self, session, label):

    def test_migrate_vhd_root(self):

    def test_migrate_vhd_ephemeral(self):

    def test_migrate_vhd_converts_exceptions(self):

class StripBaseMirrorTestCase(VMUtilsTestBase):

    def test_strip_base_mirror_from_vdi_works(self):

    def test_strip_base_mirror_from_vdi_hides_error(self):

    def test_strip_base_mirror_from_vdis(self, mock_strip):

        def call_xenapi(method, arg):

class DeviceIdTestCase(VMUtilsTestBase):

    def test_device_id_is_none_if_not_specified_in_meta_data(self):

    def test_get_device_id_if_hypervisor_version_is_greater_than_6_1(self):

    def test_raise_exception_if_device_id_not_supported_by_hyp_version(self):

class CreateVmRecordTestCase(VMUtilsTestBase):

    def test_create_vm_record_linux(self, mock_extract_flavor):

    def test_create_vm_record_windows(self, mock_extract_flavor):

    def _test_create_vm_record(self, mock_extract_flavor, instance, is_viridian):

    def test_list_vms(self):

class ResizeFunctionTestCase(test.NoDBTestCase):

    def _call_get_resize_func_name(self, brand, version):

    def _test_is_resize(self, brand, version):

    def _test_is_resize_online(self, brand, version):

    def test_xenserver_5_5(self):

    def test_xenserver_6_0(self):

    def test_xcp_1_1(self):

    def test_xcp_1_2(self):

    def test_xcp_2_0(self):

    def test_random_brand(self):

    def test_default(self):

    def test_empty(self):

    def test_bad_version(self):

class VMInfoTests(VMUtilsTestBase):

    def setUp(self):

    def test_get_power_state_valid(self):

    def test_get_power_state_invalid(self):

    def test_compile_info(self):

        def call_xenapi(method, *args):

\OpenStack\nova-2014.1\nova\tests\virt\xenapi\test_volumeops.py

class VolumeAttachTestCase(test.NoDBTestCase):

    def test_detach_volume_call(self):

        def regcall(label):

    def test_attach_volume_call(self):

    def test_attach_volume_no_hotplug(self):

    def _test_connect_volume(self, hotplug, vm_running, plugged):

    def test_connect_volume_no_hotplug_vm_running(self):

    def test_connect_volume_no_hotplug_vm_not_running(self):

    def test_connect_volume_hotplug_vm_stopped(self):

    def test_connect_volume_hotplug_vm_running(self):

    def test_connect_volume(self):

        def fake_call_xenapi(self, method, *args, **kwargs):

\OpenStack\nova-2014.1\nova\tests\virt\xenapi\test_volume_utils.py

class SROps(stubs.XenAPITestBaseNoDB):

    def test_find_sr_valid_uuid(self):

    def test_find_sr_invalid_uuid(self):

class ISCSIParametersTestCase(stubs.XenAPITestBaseNoDB):

    def test_target_host(self):

    def test_target_port(self):

class IntroduceTestCase(stubs.XenAPITestBaseNoDB):

    def test_introduce_vdi_retry(self, mock_sleep, mock_get_vdi_ref):

        def fake_get_vdi_ref(session, sr_ref, vdi_uuid, target_lun):

        def fake_call_xenapi(method, *args):

    def test_introduce_vdi_exception(self, mock_sleep, mock_get_vdi_ref):

        def fake_call_xenapi(method, *args):

\OpenStack\nova-2014.1\nova\tests\virt\xenapi\test_xenapi.py

def get_session():

def set_image_fixtures():

def get_fake_device_info():

def stub_vm_utils_with_vdi_attached_here(function):

 def decorated_function(self, *args, **kwargs):

def create_instance_with_system_metadata(context, instance_values):

class XenAPIVolumeTestCase(stubs.XenAPITestBaseNoDB):

    def setUp(self):

    def _make_connection_info(cls):

    def test_mountpoint_to_number(self):

    def test_parse_volume_info_parsing_auth_details(self):

    def test_get_device_number_raise_exception_on_wrong_mountpoint(self):

    def test_attach_volume(self):

    def test_attach_volume_raise_exception(self):

class XenAPIVMTestCase(stubs.XenAPITestBase):

    def setUp(self):

        def fake_inject_instance_metadata(self, instance, vm):

        def fake_safe_copy_vdi(session, sr_ref, instance, vdi_to_copy_ref):

    def tearDown(self):

    def test_init_host(self):

    def test_instance_exists(self):

    def test_instance_not_exists(self):

    def test_list_instances_0(self):

    def test_list_instance_uuids_0(self):

    def test_list_instance_uuids(self):

    def test_get_rrd_server(self):

    def test_get_diagnostics(self):

        def fake_get_rrd(host, vm_uuid):

    def test_get_vnc_console(self):

    def test_get_vnc_console_for_rescue(self):

    def test_get_vnc_console_instance_not_ready(self):

    def test_get_vnc_console_rescue_not_ready(self):

    def test_instance_snapshot_fails_with_no_primary_vdi(self):

        def create_bad_vbd(session, vm_ref, vdi_ref, userdevice, vbd_type='disk', read_only=False, bootable=False, osvol=False):

    def test_instance_snapshot(self):

        def fake_image_upload(_self, ctx, session, inst, vdi_uuids, img_id):

    def create_vm_record(self, conn, os_type, name):

    def check_vm_record(self, conn, instance_type_id, check_injection):

    def check_vm_params_for_windows(self):

    def check_vm_params_for_linux(self):

    def check_vm_params_for_linux_with_external_kernel(self):

    def _list_vdis(self):

    def _list_vms(self):

    def _check_vdis(self, start_list, end_list):

    def _test_spawn(self, image_ref, kernel_id, ramdisk_id, instance_type_id="3", os_type="linux", hostname="test", architecture="x86-64", instance_id=1, injected_files=None, check_injection=False, create_record=True, empty_dns=False, block_device_info=None, key_data=None):

        def fake_inject_instance_metadata(self, instance, vm):

    def test_spawn_ipxe_iso_success(self):

    def test_spawn_ipxe_iso_no_network_name(self):

    def test_spawn_ipxe_iso_no_boot_menu_url(self):

    def test_spawn_ipxe_iso_unknown_network_name(self):

    def test_spawn_empty_dns(self):

    def test_spawn_not_enough_memory(self):

    def test_spawn_fail_cleanup_1(self):

    def test_spawn_fail_cleanup_2(self):

    def test_spawn_fail_cleanup_3(self):

    def test_spawn_raw_glance(self):

    def test_spawn_vhd_glance_linux(self):

    def test_spawn_vhd_glance_windows(self):

    def test_spawn_iso_glance(self):

    def test_spawn_glance(self):

        def fake_fetch_disk_image(context, session, instance, name_label, image_id, image_type):

    def test_spawn_boot_from_volume_no_image_meta(self):

    def test_spawn_boot_from_volume_no_glance_image_meta(self):

    def test_spawn_boot_from_volume_with_image_meta(self):

    def test_spawn_netinject_file(self):

        def _tee_handler(cmd, **kwargs):

        def _readlink_handler(cmd_parts, **kwargs):

    def test_spawn_netinject_xenstore(self):

        def _mount_handler(cmd, *ignore_args, **ignore_kwargs):

        def _umount_handler(cmd, *ignore_args, **ignore_kwargs):

        def _tee_handler(cmd, *ignore_args, **ignore_kwargs):

    def test_spawn_injects_auto_disk_config_to_xenstore(self):

    def test_spawn_vlanmanager(self):

        def dummy(*args, **kwargs):

    def test_spawn_with_network_qos(self):

    def test_spawn_ssh_key_injection(self):

        def fake_inject_file(self, method, args):

        def fake_encrypt_text(sshkey, new_pass):

    def test_spawn_ssh_key_injection_non_rsa(self):

        def fake_inject_file(self, method, args):

        def fake_encrypt_text(sshkey, new_pass):

    def test_spawn_injected_files(self):

        def fake_inject_file(self, method, args):

    def test_spawn_agent_upgrade(self):

        def fake_agent_build(_self, *args):

    def test_spawn_agent_upgrade_fails_silently(self):

        def fake_agent_build(_self, *args):

        def fake_agent_update(self, method, args):

    def test_spawn_with_resetnetwork_alternative_returncode(self):

        def fake_resetnetwork(self, method, args):

    def _test_spawn_fails_silently_with(self, trigger, expected_exception):

        def fake_agent_version(self, method, args):

        def fake_add_instance_fault(*args, **kwargs):

    def test_spawn_fails_with_agent_timeout(self):

    def test_spawn_fails_with_agent_not_implemented(self):

    def test_spawn_fails_with_agent_error(self):

    def test_spawn_fails_with_agent_bad_return(self):

        def fake_agent_version(self, method, args):

    def test_spawn_fails_agent_not_implemented(self):

        def fake_agent_version(self, method, args):

    def test_rescue(self):

    def test_rescue_preserve_disk_on_failure(self):

        def fake_start(*args, **kwargs):

    def test_unrescue(self):

    def test_unrescue_not_in_rescue(self):

    def test_finish_revert_migration(self):

    def test_reboot_hard(self):

    def test_poll_rebooting_instances(self):

    def test_reboot_soft(self):

    def test_reboot_halted(self):

    def test_reboot_unknown_state(self):

    def test_reboot_rescued(self):

    def test_get_console_output_succeeds(self):

        def fake_get_console_output(instance):

    def _test_maintenance_mode(self, find_host, find_aggregate):

        def fake_call_xenapi(method, *args):

        def fake_aggregate_get(context, host, key):

        def fake_host_find(context, session, src, dst):

    def test_maintenance_mode(self):

    def test_maintenance_mode_no_host(self):

    def test_maintenance_mode_no_aggregate(self):

    def test_uuid_find(self):

    def test_session_virtapi(self):

        def fake_aggregate_get_by_host(self, *args, **kwargs):

    def test_per_instance_usage_running(self):

    def test_per_instance_usage_suspended(self):

    def test_per_instance_usage_halted(self):

    def _create_instance(self, instance_id=1, spawn=True, obj=False, **attrs):

    def test_destroy_clean_up_kernel_and_ramdisk(self):

        def fake_lookup_kernel_ramdisk(session, vm_ref):

        def fake_destroy_kernel_ramdisk(session, instance, kernel, ramdisk):

class XenAPIDiffieHellmanTestCase(test.NoDBTestCase):

    def setUp(self):

    def test_shared(self):

    def _test_encryption(self, message):

    def test_encrypt_simple_message(self):

    def test_encrypt_message_with_newlines_at_end(self):

    def test_encrypt_many_newlines_at_end(self):

    def test_encrypt_newlines_inside_message(self):

    def test_encrypt_with_leading_newlines(self):

    def test_encrypt_really_long_message(self):

class XenAPIMigrateInstance(stubs.XenAPITestBase):

    def setUp(self):

        def fake_inject_instance_metadata(self, instance, vm):

    def test_migrate_disk_and_power_off(self):

    def test_migrate_disk_and_power_off_passes_exceptions(self):

        def fake_raise(*args, **kwargs):

    def test_migrate_disk_and_power_off_throws_on_zero_gb_resize_down(self):

    def test_migrate_disk_and_power_off_with_zero_gb_old_and_new_works(self):

    def _test_revert_migrate(self, power_on):

        def fake_vm_start(*args, **kwargs):

        def fake_vdi_resize(*args, **kwargs):

        def fake_finish_revert_migration(*args, **kwargs):

    def test_revert_migrate_power_on(self):

    def test_revert_migrate_power_off(self):

    def _test_finish_migrate(self, power_on):

        def fake_vm_start(*args, **kwargs):

        def fake_vdi_resize(*args, **kwargs):

    def test_finish_migrate_power_on(self):

    def test_finish_migrate_power_off(self):

    def test_finish_migrate_no_local_storage(self):

        def fake_vdi_resize(*args, **kwargs):

    def test_finish_migrate_no_resize_vdi(self):

        def fake_vdi_resize(*args, **kwargs):

    def test_migrate_too_many_partitions_no_resize_down(self):

        def fake_get_partitions(partition):

    def test_migrate_bad_fs_type_no_resize_down(self):

        def fake_get_partitions(partition):

    def test_migrate_rollback_when_resize_down_fs_fails(self):

    def test_resize_ensure_vm_is_shutdown_cleanly(self):

    def test_resize_ensure_vm_is_shutdown_forced(self):

    def test_resize_ensure_vm_is_shutdown_fails(self):

    def test_resize_ensure_vm_is_shutdown_already_shutdown(self):

class XenAPIImageTypeTestCase(test.NoDBTestCase):

    def test_to_string(self):

    def _assert_role(self, expected_role, image_type_id):

    def test_get_image_role_kernel(self):

    def test_get_image_role_ramdisk(self):

    def test_get_image_role_disk(self):

    def test_get_image_role_disk_raw(self):

    def test_get_image_role_disk_vhd(self):

class XenAPIDetermineDiskImageTestCase(test.NoDBTestCase):

    def assert_disk_type(self, image_meta, expected_disk_type):

    def test_machine(self):

    def test_raw(self):

    def test_vhd(self):

    def test_none(self):

class XenAPIHostTestCase(stubs.XenAPITestBase):

    def setUp(self):

    def test_host_state(self):

    def test_host_state_vcpus_used(self):

    def test_pci_passthrough_devices_whitelist(self):

    def test_pci_passthrough_devices_no_whitelist(self):

    def test_host_state_missing_sr(self):

        def fake_safe_find_sr(session):

    def _test_host_action(self, method, action, expected=None):

    def test_host_reboot(self):

    def test_host_shutdown(self):

    def test_host_startup(self):

    def test_host_maintenance_on(self):

    def test_host_maintenance_off(self):

    def test_set_enable_host_enable(self):

    def test_set_enable_host_disable(self):

    def test_get_host_uptime(self):

    def test_supported_instances_is_included_in_host_state(self):

    def test_supported_instances_is_calculated_by_to_supported_instances(self):

        def to_supported_instances(somedata):

    def test_update_stats_caches_hostname(self):

class ToSupportedInstancesTestCase(test.NoDBTestCase):

    def test_default_return_value(self):

    def test_return_value(self):

    def test_invalid_values_do_not_break(self):

    def test_multiple_values(self):

class XenAPIAutoDiskConfigTestCase(stubs.XenAPITestBase):

    def setUp(self):

        def fake_create_vbd(session, vm_ref, vdi_ref, userdevice, vbd_type='disk', read_only=False, bootable=True, osvol=False):

    def assertIsPartitionCalled(self, called):

        def fake_resize_part_and_fs(dev, start, old, new):

    def test_instance_not_auto_disk_config(self):

    def test_instance_auto_disk_config_doesnt_pass_fail_safes(self):

        def fake_get_partitions(dev):

    def test_instance_auto_disk_config_passes_fail_safes(self):

        def fake_get_partitions(dev):

class XenAPIGenerateLocal(stubs.XenAPITestBase):

    def setUp(self):

        def fake_create_vbd(session, vm_ref, vdi_ref, userdevice, vbd_type='disk', read_only=False, bootable=True, osvol=False, empty=False, unpluggable=True):

    def assertCalled(self, instance, disk_image_type=vm_utils.ImageType.DISK_VHD):

    def test_generate_swap(self):

        def fake_generate_swap(*args, **kwargs):

    def test_generate_ephemeral(self):

        def fake_generate_ephemeral(*args):

    def test_generate_iso_blank_root_disk(self):

        def fake_generate_ephemeral(*args):

        def fake_generate_iso(*args):

class XenAPIBWCountersTestCase(stubs.XenAPITestBaseNoDB):

    def setUp(self):

        def _fake_get_vif_device_map(vm_rec):

    def _fake_list_vms(cls, session):

    def _fake_fetch_bandwidth_mt(session):

    def _fake_fetch_bandwidth(session):

    def test_get_all_bw_counters(self):

    def test_get_all_bw_counters_in_failure_case(self):

class XenAPIDom0IptablesFirewallTestCase(stubs.XenAPITestBase):

    def setUp(self):

    def _create_instance_ref(self):

    def _create_test_security_group(self):

    def _validate_security_group(self):

    def test_static_filters(self):

    def test_filters_for_instance_with_ip_v6(self):

    def test_filters_for_instance_without_ip_v6(self):

    def test_multinic_iptables(self):

    def test_do_refresh_security_group_rules(self):

    def test_provider_firewall_rules(self):

class XenAPISRSelectionTestCase(stubs.XenAPITestBaseNoDB):

    def test_safe_find_sr_raise_exception(self):

    def test_safe_find_sr_local_storage(self):

    def test_safe_find_sr_by_other_criteria(self):

    def test_safe_find_sr_default(self):

def _create_service_entries(context, values={'avail_zone1':

class XenAPIAggregateTestCase(stubs.XenAPITestBase):

    def setUp(self):

    def test_pool_add_to_aggregate_called_by_driver(self):

        def pool_add_to_aggregate(context, aggregate, host, slave_info=None):

    def test_pool_remove_from_aggregate_called_by_driver(self):

        def pool_remove_from_aggregate(context, aggregate, host, slave_info=None):

    def test_add_to_aggregate_for_first_host_sets_metadata(self):

        def fake_init_pool(id, name):

    def test_join_slave(self):

        def fake_join_slave(id, compute_uuid, host, url, user, password):

    def test_add_to_aggregate_first_host(self):

        def fake_pool_set_name_label(self, session, pool_ref, name):

    def test_remove_from_aggregate_called(self):

        def fake_remove_from_aggregate(context, aggregate, host):

    def test_remove_from_empty_aggregate(self):

    def test_remove_slave(self):

        def fake_eject_slave(id, compute_uuid, host_uuid):

    def test_remove_master_solo(self):

        def fake_clear_pool(id):

    def test_remote_master_non_empty_pool(self):

    def _aggregate_setup(self, aggr_name='fake_aggregate', aggr_zone='fake_zone', aggr_state=pool_states.CREATED, hosts=['host'], metadata=None):

    def test_add_host_to_aggregate_invalid_changing_status(self):

    def test_add_host_to_aggregate_invalid_dismissed_status(self):

    def test_add_host_to_aggregate_invalid_error_status(self):

    def test_remove_host_from_aggregate_error(self):

    def test_remove_host_from_aggregate_invalid_dismissed_status(self):

    def test_remove_host_from_aggregate_invalid_changing_status(self):

    def test_add_aggregate_host_raise_err(self):

        def fake_driver_add_to_aggregate(context, aggregate, host, **_ignore):

class MockComputeAPI(object):

    def __init__(self):

    def add_aggregate_host(self, ctxt, aggregate, host_param, host, slave_info):

    def remove_aggregate_host(self, ctxt, aggregate_id, host_param, host, slave_info):

class StubDependencies(object):

    def __init__(self):

    def _is_hv_pool(self, *_ignore):

    def _get_metadata(self, *_ignore):

    def _create_slave_info(self, *ignore):

class ResourcePoolWithStubs(StubDependencies, pool.ResourcePool):

class HypervisorPoolTestCase(test.NoDBTestCase):

    def test_slave_asks_master_to_add_slave_to_pool(self):

    def test_slave_asks_master_to_remove_slave_from_pool(self):

class SwapXapiHostTestCase(test.NoDBTestCase):

    def test_swapping(self):

    def test_no_port(self):

    def test_no_path(self):

class XenAPILiveMigrateTestCase(stubs.XenAPITestBaseNoDB):

    def setUp(self):

    def test_live_migration_calls_vmops(self):

        def fake_live_migrate(context, instance_ref, dest, post_method, recover_method, block_migration, migrate_data):

    def test_pre_live_migration(self):

    def test_post_live_migration_at_destination(self):

        def fake_fw(instance, network_info):

        def fake_create_kernel_and_ramdisk(context, session, instance, name_label):

        def fake_get_vm_opaque_ref(instance):

        def fake_strip_base_mirror_from_vdis(session, vm_ref):

    def test_check_can_live_migrate_destination_with_block_migration(self):

    def test_check_live_migrate_destination_verifies_ip(self):

    def test_check_can_live_migrate_destination_block_migration_fails(self):

    def _add_default_live_migrate_stubs(self, conn):

        def fake_generate_vdi_map(destination_sr_ref, _vm_ref):

        def fake_get_iscsi_srs(destination_sr_ref, _vm_ref):

        def fake_get_vm_opaque_ref(instance):

        def fake_lookup_kernel_ramdisk(session, vm):

    def test_check_can_live_migrate_source_with_block_migrate(self):

    def test_check_can_live_migrate_source_with_block_migrate_iscsi(self):

        def fake_get_iscsi_srs(destination_sr_ref, _vm_ref):

        def fake_make_plugin_call(plugin, method, **args):

    def test_check_can_live_migrate_source_with_block_iscsi_fails(self):

        def fake_get_iscsi_srs(destination_sr_ref, _vm_ref):

        def fake_make_plugin_call(plugin, method, **args):

    def test_check_can_live_migrate_source_with_block_migrate_fails(self):

    def test_check_can_live_migrate_works(self):

        def fake_aggregate_get_by_host(context, host, key=None):

    def test_check_can_live_migrate_fails(self):

        def fake_aggregate_get_by_host(context, host, key=None):

    def test_live_migration(self):

        def fake_get_vm_opaque_ref(instance):

        def fake_get_host_opaque_ref(context, destination_hostname):

        def post_method(context, instance, destination_hostname, block_migration, migrate_data):

    def test_live_migration_on_failure(self):

        def fake_get_vm_opaque_ref(instance):

        def fake_get_host_opaque_ref(context, destination_hostname):

        def fake_call_xenapi(*args):

        def recover_method(context, instance, destination_hostname, block_migration):

    def test_live_migration_calls_post_migration(self):

        def post_method(context, instance, destination_hostname, block_migration, migrate_data):

    def test_live_migration_block_cleans_srs(self):

        def fake_get_iscsi_srs(context, instance):

        def fake_forget_sr(context, instance):

        def post_method(context, instance, destination_hostname, block_migration, migrate_data):

    def test_live_migration_with_block_migration_raises_invalid_param(self):

        def recover_method(context, instance, destination_hostname, block_migration):

    def test_live_migration_with_block_migration_fails_migrate_send(self):

        def recover_method(context, instance, destination_hostname, block_migration):

    def test_live_migrate_block_migration_xapi_call_parameters(self):

        def fake_generate_vdi_map(destination_sr_ref, _vm_ref):

        def dummy_callback(*args, **kwargs):

    def test_live_migrate_pool_migration_xapi_call_parameters(self):

        def fake_get_host_opaque_ref(context, destination):

        def dummy_callback(*args, **kwargs):

    def test_generate_vdi_map(self):

        def fake_find_sr(_session):

        def fake_get_instance_vdis_for_sr(_session, _vm_ref, _sr_ref):

    def test_rollback_live_migration_at_destination(self):

class XenAPIInjectMetadataTestCase(stubs.XenAPITestBaseNoDB):

    def setUp(self):

        def fake_get_vm_opaque_ref(inst, instance):

        def fake_add_to_param_xenstore(inst, vm_ref, key, val):

        def fake_remove_from_param_xenstore(inst, vm_ref, key):

        def fake_write_to_xenstore(inst, instance, path, value, vm_ref=None):

        def fake_delete_from_xenstore(inst, instance, path, vm_ref=None):

    def test_inject_instance_metadata(self):

    def test_change_instance_metadata_add(self):

    def test_change_instance_metadata_update(self):

    def test_change_instance_metadata_delete(self):

    def test_change_instance_metadata_not_found(self):

class XenAPISessionTestCase(test.NoDBTestCase):

    def _get_mock_xapisession(self, software_version):

    def test_local_session(self):

    def test_remote_session(self):

    def test_get_product_version_product_brand_does_not_fail(self):

    def test_get_product_version_product_brand_xs_6(self):

    def test_verify_plugin_version_same(self):

    def test_verify_plugin_version_compatible(self):

    def test_verify_plugin_version_bad_maj(self):

    def test_verify_plugin_version_bad_min(self):

    def test_verify_current_version_matches(self):

class XenAPIFakeTestCase(test.NoDBTestCase):

    def test_query_matches(self):

    def test_query_bad_format(self):

\OpenStack\nova-2014.1\nova\tests\virt\xenapi\__init__.py

\OpenStack\nova-2014.1\nova\tests\virt\__init__.py

\OpenStack\nova-2014.1\nova\tests\volume\encryptors\test_base.py

class VolumeEncryptorTestCase(test.TestCase):

    def _create(self, device_path):

    def setUp(self):

\OpenStack\nova-2014.1\nova\tests\volume\encryptors\test_cryptsetup.py

def fake__get_key(context):

class CryptsetupEncryptorTestCase(test_base.VolumeEncryptorTestCase):

    def _create(self, connection_info):

    def setUp(self):

        def fake_execute(*cmd, **kwargs):

    def test__open_volume(self):

    def test_attach_volume(self):

    def test__close_volume(self):

    def test_detach_volume(self):

\OpenStack\nova-2014.1\nova\tests\volume\encryptors\test_luks.py

class LuksEncryptorTestCase(test_cryptsetup.CryptsetupEncryptorTestCase):

    def _create(self, connection_info):

    def setUp(self):

    def test__format_volume(self):

    def test__open_volume(self):

    def test_attach_volume(self):

    def test__close_volume(self):

    def test_detach_volume(self):

\OpenStack\nova-2014.1\nova\tests\volume\encryptors\test_nop.py

class NoOpEncryptorTestCase(test_base.VolumeEncryptorTestCase):

    def _create(self, connection_info):

    def setUp(self):

    def test_attach_volume(self):

    def test_detach_volume(self):

\OpenStack\nova-2014.1\nova\tests\volume\encryptors\__init__.py

\OpenStack\nova-2014.1\nova\tests\volume\test_cinder.py

class FakeCinderClient(object):

    def __init__(self):

class CinderApiTestCase(test.NoDBTestCase):

    def setUp(self):

    def test_get(self):

    def test_get_failed(self):

    def test_create(self):

    def test_create_failed(self):

    def test_get_all(self):

    def test_check_attach_volume_status_error(self):

    def test_check_attach_volume_already_attached(self):

    def test_check_attach_availability_zone_differs(self):

    def test_check_attach(self):

    def test_check_detach(self):

    def test_reserve_volume(self):

    def test_unreserve_volume(self):

    def test_begin_detaching(self):

    def test_roll_detaching(self):

    def test_attach(self):

    def test_detach(self):

    def test_initialize_connection(self):

    def test_terminate_connection(self):

    def test_delete(self):

    def test_update(self):

    def test_get_snapshot(self):

    def test_get_snapshot_failed(self):

    def test_get_all_snapshots(self):

    def test_create_snapshot(self):

    def test_create_force(self):

    def test_delete_snapshot(self):

    def test_get_volume_metadata(self):

    def test_get_volume_metadata_value(self):

    def test_delete_volume_metadata(self):

    def test_update_volume_metadata(self):

    def test_update_snapshot_status(self):

    def test_get_volume_encryption_metadata(self):

\OpenStack\nova-2014.1\nova\tests\volume\__init__.py

\OpenStack\nova-2014.1\nova\tests\__init__.py

\OpenStack\nova-2014.1\nova\utils.py

def vpn_ping(address, port, timeout=0.05, session_id=None):

def _get_root_helper():

def execute(*cmd, **kwargs):

def trycmd(*args, **kwargs):

def novadir():

def generate_uid(topic, size=8):

def last_completed_audit_period(unit=None, before=None):

def generate_password(length=None, symbolgroups=DEFAULT_PASSWORD_SYMBOLS):

def get_my_ipv4_address():

def _get_ipv4_address_for_interface(iface):

def get_my_linklocal(interface):

class LazyPluggable(object):

    def __init__(self, pivot, config_group=None, **backends):

    def __get_backend(self):

    def __getattr__(self, key):

def xhtml_escape(value):

def utf8(value):

def check_isinstance(obj, cls):

def parse_server_string(server_str):

def is_int_like(val):

def is_valid_ipv4(address):

def is_valid_ipv6(address):

def is_valid_ip_address(address):

def is_valid_ipv6_cidr(address):

def get_shortened_ipv6(address):

def get_shortened_ipv6_cidr(address):

def is_valid_cidr(address):

def get_ip_version(network):

def monkey_patch():

def convert_to_list_dict(lst, label):

def make_dev_path(dev, partition=None, base='/dev'):

def sanitize_hostname(hostname):

def read_cached_file(filename, cache_info, reload_func=None):

def temporary_mutation(obj, **kwargs):

    def is_dict_like(thing):

    def get(thing, attr, default):

    def set_value(thing, attr, val):

    def delete(thing, attr):

def generate_mac_address():

def read_file_as_root(file_path):

def temporary_chown(path, owner_uid=None):

def chown(path, owner_uid=None):

def tempdir(**kwargs):

def walk_class_hierarchy(clazz, encountered=None):

class UndoManager(object):

    def __init__(self):

    def undo_with(self, undo_func):

    def _rollback(self):

    def rollback_and_reraise(self, msg=None, **kwargs):

def mkfs(fs, path, label=None, run_as_root=False):

def last_bytes(file_like_object, num):

def metadata_to_dict(metadata):

def dict_to_metadata(metadata):

def instance_meta(instance):

def instance_sys_meta(instance):

def get_wrapped_function(function):

    def _get_wrapped_function(function):

def expects_func_args(*args):

    def _decorator_checker(dec):

        def _decorator(f):

class ExceptionHelper(object):

    def __init__(self, target):

    def __getattr__(self, name):

        def wrapper(*args, **kwargs):

\OpenStack\nova-2014.1\nova\version.py

def _load_config():

def vendor_string():

def product_string():

def package_string():

def version_string_with_package():

\OpenStack\nova-2014.1\nova\virt\baremetal\baremetal_states.py

\OpenStack\nova-2014.1\nova\virt\baremetal\base.py

class NodeDriver(object):

    def __init__(self, virtapi):

    def cache_images(self, context, node, instance, **kwargs):

    def destroy_images(self, context, node, instance):

    def activate_bootloader(self, context, node, instance, **kwargs):

    def deactivate_bootloader(self, context, node, instance):

    def activate_node(self, context, node, instance):

    def deactivate_node(self, context, node, instance):

    def get_console_output(self, node, instance):

    def dhcp_options_for_instance(self, instance):

class PowerManager(object):

    def __init__(self, **kwargs):

    def activate_node(self):

    def reboot_node(self):

    def deactivate_node(self):

    def is_power_on(self):

    def start_console(self):

    def stop_console(self):

\OpenStack\nova-2014.1\nova\virt\baremetal\common.py

class ConnectionFailed(exception.NovaException):

class Connection(object):

    def __init__(self, host, username, password, port=22, keyfile=None):

def ssh_connect(connection):

\OpenStack\nova-2014.1\nova\virt\baremetal\db\api.py

def bm_node_get_all(context, service_host=None):

def bm_node_get_associated(context, service_host=None):

def bm_node_get_unassociated(context, service_host=None):

def bm_node_find_free(context, service_host=None, memory_mb=None, cpus=None, local_gb=None):

def bm_node_get(context, bm_node_id):

def bm_node_get_by_instance_uuid(context, instance_uuid):

def bm_node_get_by_node_uuid(context, node_uuid):

def bm_node_create(context, values):

def bm_node_destroy(context, bm_node_id):

def bm_node_update(context, bm_node_id, values):

def bm_node_associate_and_update(context, node_uuid, values):

def bm_interface_get(context, if_id):

def bm_interface_get_all(context):

def bm_interface_destroy(context, if_id):

def bm_interface_create(context, bm_node_id, address, datapath_id, port_no):

def bm_interface_set_vif_uuid(context, if_id, vif_uuid):

def bm_interface_get_by_vif_uuid(context, vif_uuid):

def bm_interface_get_all_by_bm_node_id(context, bm_node_id):

\OpenStack\nova-2014.1\nova\virt\baremetal\db\migration.py

def db_sync(version=None):

def db_version():

def db_initial_version():

\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\api.py

def model_query(context, *args, **kwargs):

def _save(ref, session=None):

def _build_node_order_by(query):

def bm_node_get_all(context, service_host=None):

def bm_node_get_associated(context, service_host=None):

def bm_node_get_unassociated(context, service_host=None):

def bm_node_find_free(context, service_host=None, cpus=None, memory_mb=None, local_gb=None):

def bm_node_get(context, bm_node_id):

def bm_node_get_by_instance_uuid(context, instance_uuid):

def bm_node_get_by_node_uuid(context, bm_node_uuid):

def bm_node_create(context, values):

def bm_node_update(context, bm_node_id, values):

def bm_node_associate_and_update(context, node_uuid, values):

def bm_node_destroy(context, bm_node_id):

def bm_interface_get(context, if_id):

def bm_interface_get_all(context):

def bm_interface_destroy(context, if_id):

def bm_interface_create(context, bm_node_id, address, datapath_id, port_no):

def bm_interface_set_vif_uuid(context, if_id, vif_uuid):

def bm_interface_get_by_vif_uuid(context, vif_uuid):

def bm_interface_get_all_by_bm_node_id(context, bm_node_id):

\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\001_init.py

def upgrade(migrate_engine):

def downgrade(migrate_engine):

\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\002_drop_bm_deployments.py

def upgrade(migrate_engine):

def downgrade(migrate_engine):

\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\003_add_uuid_to_bm_nodes.py

def upgrade(migrate_engine):

def downgrade(migrate_engine):

\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\004_add_instance_name_to_bm_nodes.py

def upgrade(migrate_engine):

def downgrade(migrate_engine):

\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\005_drop_unused_columns_from_nodes.py

def upgrade(migrate_engine):

def downgrade(migrate_engine):

\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\006_move_prov_mac_address.py

def upgrade(migrate_engine):

def downgrade(migrate_engine):

\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\007_drop_prov_mac_address.py

def upgrade(migrate_engine):

def downgrade(migrate_engine):

\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\008_remove_bm_pxe_ips_table.py

def upgrade(migrate_engine):

def downgrade(migrate_engine):

\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\009_add_ephemeral_mb_to_bm_nodes.py

def upgrade(migrate_engine):

def downgrade(migrate_engine):

\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\010_add_preserve_ephemeral.py

def upgrade(migrate_engine):

def downgrade(migrate_engine):

\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\versions\__init__.py

\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migrate_repo\__init__.py

\OpenStack\nova-2014.1\nova\virt\baremetal\db\sqlalchemy\migration.py

def db_sync(version=None):

def db_version():

OpenStack Index

Previous

Next