¡@

Home 

OpenStack Study: host_manager.py

OpenStack Index

**** CubicPower OpenStack Study ****

# Copyright (c) 2011 OpenStack Foundation

# All Rights Reserved.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

"""

Manage hosts in the current zone.

"""

import collections

import UserDict

from oslo.config import cfg

from nova.compute import task_states

from nova.compute import vm_states

from nova import db

from nova import exception

from nova.openstack.common.gettextutils import _

from nova.openstack.common import jsonutils

from nova.openstack.common import log as logging

from nova.openstack.common import timeutils

from nova.pci import pci_request

from nova.pci import pci_stats

from nova.scheduler import filters

from nova.scheduler import weights

host_manager_opts = [

cfg.MultiStrOpt('scheduler_available_filters',

default=['nova.scheduler.filters.all_filters'],

help='Filter classes available to the scheduler which may '

'be specified more than once. An entry of '

'"nova.scheduler.filters.standard_filters" '

'maps to all filters included with nova.'),

cfg.ListOpt('scheduler_default_filters',

default=[

'RetryFilter',

'AvailabilityZoneFilter',

'RamFilter',

'ComputeFilter',

'ComputeCapabilitiesFilter',

'ImagePropertiesFilter',

'ServerGroupAntiAffinityFilter',

'ServerGroupAffinityFilter',

],

help='Which filter class names to use for filtering hosts '

'when not specified in the request.'),

cfg.ListOpt('scheduler_weight_classes',

default=['nova.scheduler.weights.all_weighers'],

help='Which weight class names to use for weighing hosts'),

]

CONF = cfg.CONF

CONF.register_opts(host_manager_opts)

LOG = logging.getLogger(__name__)

**** CubicPower OpenStack Study ****

class ReadOnlyDict(UserDict.IterableUserDict):

"""A read-only dict."""

**** CubicPower OpenStack Study ****

    def __init__(self, source=None):

        self.data = {}

        self.update(source)

**** CubicPower OpenStack Study ****

    def __setitem__(self, key, item):

        raise TypeError()

**** CubicPower OpenStack Study ****

    def __delitem__(self, key):

        raise TypeError()

**** CubicPower OpenStack Study ****

    def clear(self):

        raise TypeError()

**** CubicPower OpenStack Study ****

    def pop(self, key, *args):

        raise TypeError()

**** CubicPower OpenStack Study ****

    def popitem(self):

        raise TypeError()

**** CubicPower OpenStack Study ****

    def update(self, source=None):

        if source is None:

            return

        elif isinstance(source, UserDict.UserDict):

            self.data = source.data

        elif isinstance(source, type({})):

            self.data = source

        else:

            raise TypeError()

# Representation of a single metric value from a compute node.

MetricItem = collections.namedtuple(

             'MetricItem', ['value', 'timestamp', 'source'])

**** CubicPower OpenStack Study ****

class HostState(object):

"""Mutable and immutable information tracked for a host.

This is an attempt to remove the ad-hoc data structures

previously used and lock down access.

"""

**** CubicPower OpenStack Study ****

    def __init__(self, host, node, capabilities=None, service=None):

        self.host = host

        self.nodename = node

        self.update_capabilities(capabilities, service)

        # Mutable available resources.

        # These will change as resources are virtually "consumed".

        self.total_usable_ram_mb = 0

        self.total_usable_disk_gb = 0

        self.disk_mb_used = 0

        self.free_ram_mb = 0

        self.free_disk_mb = 0

        self.vcpus_total = 0

        self.vcpus_used = 0

        # Additional host information from the compute node stats:

        self.vm_states = {}

        self.task_states = {}

        self.num_instances = 0

        self.num_instances_by_project = {}

        self.num_instances_by_os_type = {}

        self.num_io_ops = 0

        # Other information

        self.host_ip = None

        self.hypervisor_type = None

        self.hypervisor_version = None

        self.hypervisor_hostname = None

        self.cpu_info = None

        self.supported_instances = None

        # Resource oversubscription values for the compute host:

        self.limits = {}

        # Generic metrics from compute nodes

        self.metrics = {}

        self.updated = None

**** CubicPower OpenStack Study ****

    def update_capabilities(self, capabilities=None, service=None):

        # Read-only capability dicts

        if capabilities is None:

            capabilities = {}

        self.capabilities = ReadOnlyDict(capabilities)

        if service is None:

            service = {}

        self.service = ReadOnlyDict(service)

**** CubicPower OpenStack Study ****

    def _update_metrics_from_compute_node(self, compute):

        #NOTE(llu): The 'or []' is to avoid json decode failure of None

        #           returned from compute.get, because DB schema allows

        #           NULL in the metrics column

        metrics = compute.get('metrics', []) or []

        if metrics:

            metrics = jsonutils.loads(metrics)

        for metric in metrics:

            # 'name', 'value', 'timestamp' and 'source' are all required

            # to be valid keys, just let KeyError happen if any one of

            # them is missing. But we also require 'name' to be True.

            name = metric['name']

            item = MetricItem(value=metric['value'],

                              timestamp=metric['timestamp'],

                              source=metric['source'])

            if name:

                self.metrics[name] = item

            else:

                LOG.warn(_("Metric name unknown of %r") % item)

**** CubicPower OpenStack Study ****

    def update_from_compute_node(self, compute):

        """Update information about a host from its compute_node info."""

        if (self.updated and compute['updated_at']

                and self.updated > compute['updated_at']):

            return

        all_ram_mb = compute['memory_mb']

        # Assume virtual size is all consumed by instances if use qcow2 disk.

        free_gb = compute['free_disk_gb']

        least_gb = compute.get('disk_available_least')

        if least_gb is not None:

            if least_gb > free_gb:

                # can occur when an instance in database is not on host

                LOG.warn(_("Host has more disk space than database expected"

                           " (%(physical)sgb > %(database)sgb)") %

                         {'physical': least_gb, 'database': free_gb})

            free_gb = min(least_gb, free_gb)

        free_disk_mb = free_gb * 1024

        self.disk_mb_used = compute['local_gb_used'] * 1024

        #NOTE(jogo) free_ram_mb can be negative

        self.free_ram_mb = compute['free_ram_mb']

        self.total_usable_ram_mb = all_ram_mb

        self.total_usable_disk_gb = compute['local_gb']

        self.free_disk_mb = free_disk_mb

        self.vcpus_total = compute['vcpus']

        self.vcpus_used = compute['vcpus_used']

        self.updated = compute['updated_at']

        if 'pci_stats' in compute:

            self.pci_stats = pci_stats.PciDeviceStats(compute['pci_stats'])

        else:

            self.pci_stats = None

        # All virt drivers report host_ip

        self.host_ip = compute['host_ip']

        self.hypervisor_type = compute.get('hypervisor_type')

        self.hypervisor_version = compute.get('hypervisor_version')

        self.hypervisor_hostname = compute.get('hypervisor_hostname')

        self.cpu_info = compute.get('cpu_info')

        if compute.get('supported_instances'):

            self.supported_instances = jsonutils.loads(

                    compute.get('supported_instances'))

        # Don't store stats directly in host_state to make sure these don't

        # overwrite any values, or get overwritten themselves. Store in self so

        # filters can schedule with them.

        stats = compute.get('stats', None) or '{}'

        self.stats = jsonutils.loads(stats)

        self.hypervisor_version = compute['hypervisor_version']

        # Track number of instances on host

        self.num_instances = int(self.stats.get('num_instances', 0))

        # Track number of instances by project_id

        project_id_keys = [k for k in self.stats.keys() if

                k.startswith("num_proj_")]

        for key in project_id_keys:

            project_id = key[9:]

            self.num_instances_by_project[project_id] = int(self.stats[key])

        # Track number of instances in certain vm_states

        vm_state_keys = [k for k in self.stats.keys() if

                k.startswith("num_vm_")]

        for key in vm_state_keys:

            vm_state = key[7:]

            self.vm_states[vm_state] = int(self.stats[key])

        # Track number of instances in certain task_states

        task_state_keys = [k for k in self.stats.keys() if

                k.startswith("num_task_")]

        for key in task_state_keys:

            task_state = key[9:]

            self.task_states[task_state] = int(self.stats[key])

        # Track number of instances by host_type

        os_keys = [k for k in self.stats.keys() if

                k.startswith("num_os_type_")]

        for key in os_keys:

            os = key[12:]

            self.num_instances_by_os_type[os] = int(self.stats[key])

        self.num_io_ops = int(self.stats.get('io_workload', 0))

        # update metrics

        self._update_metrics_from_compute_node(compute)

**** CubicPower OpenStack Study ****

    def consume_from_instance(self, instance):

        """Incrementally update host state from an instance."""

        disk_mb = (instance['root_gb'] + instance['ephemeral_gb']) * 1024

        ram_mb = instance['memory_mb']

        vcpus = instance['vcpus']

        self.free_ram_mb -= ram_mb

        self.free_disk_mb -= disk_mb

        self.vcpus_used += vcpus

        self.updated = timeutils.utcnow()

        # Track number of instances on host

        self.num_instances += 1

        # Track number of instances by project_id

        project_id = instance.get('project_id')

        if project_id not in self.num_instances_by_project:

            self.num_instances_by_project[project_id] = 0

        self.num_instances_by_project[project_id] += 1

        # Track number of instances in certain vm_states

        vm_state = instance.get('vm_state', vm_states.BUILDING)

        if vm_state not in self.vm_states:

            self.vm_states[vm_state] = 0

        self.vm_states[vm_state] += 1

        # Track number of instances in certain task_states

        task_state = instance.get('task_state')

        if task_state not in self.task_states:

            self.task_states[task_state] = 0

        self.task_states[task_state] += 1

        # Track number of instances by host_type

        os_type = instance.get('os_type')

        if os_type not in self.num_instances_by_os_type:

            self.num_instances_by_os_type[os_type] = 0

        self.num_instances_by_os_type[os_type] += 1

        pci_requests = pci_request.get_instance_pci_requests(instance)

        if pci_requests and self.pci_stats:

            self.pci_stats.apply_requests(pci_requests)

        vm_state = instance.get('vm_state', vm_states.BUILDING)

        task_state = instance.get('task_state')

        if vm_state == vm_states.BUILDING or task_state in [

                task_states.RESIZE_MIGRATING, task_states.REBUILDING,

                task_states.RESIZE_PREP, task_states.IMAGE_SNAPSHOT,

                task_states.IMAGE_BACKUP]:

            self.num_io_ops += 1

**** CubicPower OpenStack Study ****

    def __repr__(self):

        return ("(%s, %s) ram:%s disk:%s io_ops:%s instances:%s" %

                (self.host, self.nodename, self.free_ram_mb, self.free_disk_mb,

                 self.num_io_ops, self.num_instances))

**** CubicPower OpenStack Study ****

class HostManager(object):

"""Base HostManager class."""

# Can be overridden in a subclass

host_state_cls = HostState

**** CubicPower OpenStack Study ****

    def __init__(self):

        # { (host, hypervisor_hostname) : {  : { cap k : v }}}

        self.service_states = {}

        self.host_state_map = {}

        self.filter_handler = filters.HostFilterHandler()

        self.filter_classes = self.filter_handler.get_matching_classes(

                CONF.scheduler_available_filters)

        self.weight_handler = weights.HostWeightHandler()

        self.weight_classes = self.weight_handler.get_matching_classes(

                CONF.scheduler_weight_classes)

**** CubicPower OpenStack Study ****

    def _choose_host_filters(self, filter_cls_names):

        """Since the caller may specify which filters to use we need

        to have an authoritative list of what is permissible. This

        function checks the filter names against a predefined set

        of acceptable filters.

        """

        if filter_cls_names is None:

            filter_cls_names = CONF.scheduler_default_filters

        if not isinstance(filter_cls_names, (list, tuple)):

            filter_cls_names = [filter_cls_names]

        cls_map = dict((cls.__name__, cls) for cls in self.filter_classes)

        good_filters = []

        bad_filters = []

        for filter_name in filter_cls_names:

            if filter_name not in cls_map:

                bad_filters.append(filter_name)

                continue

            good_filters.append(cls_map[filter_name])

        if bad_filters:

            msg = ", ".join(bad_filters)

            raise exception.SchedulerHostFilterNotFound(filter_name=msg)

        return good_filters

**** CubicPower OpenStack Study ****

    def get_filtered_hosts(self, hosts, filter_properties,

            filter_class_names=None, index=0):

        """Filter hosts and return only ones passing all filters."""

        def _strip_ignore_hosts(host_map, hosts_to_ignore):

            ignored_hosts = []

            for host in hosts_to_ignore:

                for (hostname, nodename) in host_map.keys():

                    if host == hostname:

                        del host_map[(hostname, nodename)]

                        ignored_hosts.append(host)

            ignored_hosts_str = ', '.join(ignored_hosts)

            msg = _('Host filter ignoring hosts: %s')

            LOG.audit(msg % ignored_hosts_str)

        def _match_forced_hosts(host_map, hosts_to_force):

            forced_hosts = []

            for (hostname, nodename) in host_map.keys():

                if hostname not in hosts_to_force:

                    del host_map[(hostname, nodename)]

                else:

                    forced_hosts.append(hostname)

            if host_map:

                forced_hosts_str = ', '.join(forced_hosts)

                msg = _('Host filter forcing available hosts to %s')

            else:

                forced_hosts_str = ', '.join(hosts_to_force)

                msg = _("No hosts matched due to not matching "

                        "'force_hosts' value of '%s'")

            LOG.audit(msg % forced_hosts_str)

        def _match_forced_nodes(host_map, nodes_to_force):

            forced_nodes = []

            for (hostname, nodename) in host_map.keys():

                if nodename not in nodes_to_force:

                    del host_map[(hostname, nodename)]

                else:

                    forced_nodes.append(nodename)

            if host_map:

                forced_nodes_str = ', '.join(forced_nodes)

                msg = _('Host filter forcing available nodes to %s')

            else:

                forced_nodes_str = ', '.join(nodes_to_force)

                msg = _("No nodes matched due to not matching "

                        "'force_nodes' value of '%s'")

            LOG.audit(msg % forced_nodes_str)

        filter_classes = self._choose_host_filters(filter_class_names)

        ignore_hosts = filter_properties.get('ignore_hosts', [])

        force_hosts = filter_properties.get('force_hosts', [])

        force_nodes = filter_properties.get('force_nodes', [])

        if ignore_hosts or force_hosts or force_nodes:

            # NOTE(deva): we can't assume "host" is unique because

            #             one host may have many nodes.

            name_to_cls_map = dict([((x.host, x.nodename), x) for x in hosts])

            if ignore_hosts:

                _strip_ignore_hosts(name_to_cls_map, ignore_hosts)

                if not name_to_cls_map:

                    return []

            # NOTE(deva): allow force_hosts and force_nodes independently

            if force_hosts:

                _match_forced_hosts(name_to_cls_map, force_hosts)

            if force_nodes:

                _match_forced_nodes(name_to_cls_map, force_nodes)

            if force_hosts or force_nodes:

                # NOTE(deva): Skip filters when forcing host or node

                if name_to_cls_map:

                    return name_to_cls_map.values()

            hosts = name_to_cls_map.itervalues()

        return self.filter_handler.get_filtered_objects(filter_classes,

                hosts, filter_properties, index)

**** CubicPower OpenStack Study ****

        def _strip_ignore_hosts(host_map, hosts_to_ignore):

            ignored_hosts = []

            for host in hosts_to_ignore:

                for (hostname, nodename) in host_map.keys():

                    if host == hostname:

                        del host_map[(hostname, nodename)]

                        ignored_hosts.append(host)

            ignored_hosts_str = ', '.join(ignored_hosts)

            msg = _('Host filter ignoring hosts: %s')

            LOG.audit(msg % ignored_hosts_str)

**** CubicPower OpenStack Study ****

        def _match_forced_hosts(host_map, hosts_to_force):

            forced_hosts = []

            for (hostname, nodename) in host_map.keys():

                if hostname not in hosts_to_force:

                    del host_map[(hostname, nodename)]

                else:

                    forced_hosts.append(hostname)

            if host_map:

                forced_hosts_str = ', '.join(forced_hosts)

                msg = _('Host filter forcing available hosts to %s')

            else:

                forced_hosts_str = ', '.join(hosts_to_force)

                msg = _("No hosts matched due to not matching "

                        "'force_hosts' value of '%s'")

            LOG.audit(msg % forced_hosts_str)

**** CubicPower OpenStack Study ****

        def _match_forced_nodes(host_map, nodes_to_force):

            forced_nodes = []

            for (hostname, nodename) in host_map.keys():

                if nodename not in nodes_to_force:

                    del host_map[(hostname, nodename)]

                else:

                    forced_nodes.append(nodename)

            if host_map:

                forced_nodes_str = ', '.join(forced_nodes)

                msg = _('Host filter forcing available nodes to %s')

            else:

                forced_nodes_str = ', '.join(nodes_to_force)

                msg = _("No nodes matched due to not matching "

                        "'force_nodes' value of '%s'")

            LOG.audit(msg % forced_nodes_str)

        filter_classes = self._choose_host_filters(filter_class_names)

        ignore_hosts = filter_properties.get('ignore_hosts', [])

        force_hosts = filter_properties.get('force_hosts', [])

        force_nodes = filter_properties.get('force_nodes', [])

        if ignore_hosts or force_hosts or force_nodes:

            # NOTE(deva): we can't assume "host" is unique because

            #             one host may have many nodes.

            name_to_cls_map = dict([((x.host, x.nodename), x) for x in hosts])

            if ignore_hosts:

                _strip_ignore_hosts(name_to_cls_map, ignore_hosts)

                if not name_to_cls_map:

                    return []

            # NOTE(deva): allow force_hosts and force_nodes independently

            if force_hosts:

                _match_forced_hosts(name_to_cls_map, force_hosts)

            if force_nodes:

                _match_forced_nodes(name_to_cls_map, force_nodes)

            if force_hosts or force_nodes:

                # NOTE(deva): Skip filters when forcing host or node

                if name_to_cls_map:

                    return name_to_cls_map.values()

            hosts = name_to_cls_map.itervalues()

        return self.filter_handler.get_filtered_objects(filter_classes,

                hosts, filter_properties, index)

**** CubicPower OpenStack Study ****

    def get_weighed_hosts(self, hosts, weight_properties):

        """Weigh the hosts."""

        return self.weight_handler.get_weighed_objects(self.weight_classes,

                hosts, weight_properties)

**** CubicPower OpenStack Study ****

    def get_all_host_states(self, context):

        """Returns a list of HostStates that represents all the hosts

        the HostManager knows about. Also, each of the consumable resources

        in HostState are pre-populated and adjusted based on data in the db.

        """

        # Get resource usage across the available compute nodes:

        compute_nodes = db.compute_node_get_all(context)

        seen_nodes = set()

        for compute in compute_nodes:

            service = compute['service']

            if not service:

                LOG.warn(_("No service for compute ID %s") % compute['id'])

                continue

            host = service['host']

            node = compute.get('hypervisor_hostname')

            state_key = (host, node)

            capabilities = self.service_states.get(state_key, None)

            host_state = self.host_state_map.get(state_key)

            if host_state:

                host_state.update_capabilities(capabilities,

                                               dict(service.iteritems()))

            else:

                host_state = self.host_state_cls(host, node,

                        capabilities=capabilities,

                        service=dict(service.iteritems()))

                self.host_state_map[state_key] = host_state

            host_state.update_from_compute_node(compute)

            seen_nodes.add(state_key)

        # remove compute nodes from host_state_map if they are not active

        dead_nodes = set(self.host_state_map.keys()) - seen_nodes

        for state_key in dead_nodes:

            host, node = state_key

            LOG.info(_("Removing dead compute node %(host)s:%(node)s "

                       "from scheduler") % {'host': host, 'node': node})

            del self.host_state_map[state_key]

        return self.host_state_map.itervalues()