¡@

Home 

OpenStack Study: hyperv_neutron_agent.py

OpenStack Index

**** CubicPower OpenStack Study ****

# vim: tabstop=4 shiftwidth=4 softtabstop=4

#Copyright 2013 Cloudbase Solutions SRL

#Copyright 2013 Pedro Navarro Perez

#All Rights Reserved.

#

# Licensed under the Apache License, Version 2.0 (the "License"); you may

# not use this file except in compliance with the License. You may obtain

# a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

# License for the specific language governing permissions and limitations

# under the License.

# @author: Pedro Navarro Perez

# @author: Alessandro Pilotti, Cloudbase Solutions Srl

import eventlet

import platform

import re

import time

from oslo.config import cfg

from neutron.agent.common import config

from neutron.agent import rpc as agent_rpc

from neutron.agent import securitygroups_rpc as sg_rpc

from neutron.common import config as logging_config

from neutron.common import constants as n_const

from neutron.common import topics

from neutron import context

from neutron.openstack.common import log as logging

from neutron.openstack.common import loopingcall

from neutron.openstack.common.rpc import dispatcher

from neutron.plugins.common import constants as p_const

from neutron.plugins.hyperv.agent import utils

from neutron.plugins.hyperv.agent import utilsfactory

from neutron.plugins.hyperv.common import constants

LOG = logging.getLogger(__name__)

agent_opts = [

cfg.ListOpt(

'physical_network_vswitch_mappings',

default=[],

help=_('List of : '

'where the physical networks can be expressed with '

'wildcards, e.g.: ."*:external"')),

cfg.StrOpt(

'local_network_vswitch',

default='private',

help=_('Private vswitch name used for local networks')),

cfg.IntOpt('polling_interval', default=2,

help=_("The number of seconds the agent will wait between "

"polling for local device changes.")),

cfg.BoolOpt('enable_metrics_collection',

default=False,

help=_('Enables metrics collections for switch ports by using '

'Hyper-V\'s metric APIs. Collected data can by '

'retrieved by other apps and services, e.g.: '

'Ceilometer. Requires Hyper-V / Windows Server 2012 '

'and above')),

cfg.IntOpt('metrics_max_retries',

default=100,

help=_('Specifies the maximum number of retries to enable '

'Hyper-V\'s port metrics collection. The agent will try '

'to enable the feature once every polling_interval '

'period for at most metrics_max_retries or until it '

'succeedes.'))

]

CONF = cfg.CONF

CONF.register_opts(agent_opts, "AGENT")

config.register_agent_state_opts_helper(cfg.CONF)

**** CubicPower OpenStack Study ****

class HyperVSecurityAgent(sg_rpc.SecurityGroupAgentRpcMixin):

# Set RPC API version to 1.1 by

**** CubicPower OpenStack Study ****

    def __init__(self, context, plugin_rpc):

        self.context = context

        self.plugin_rpc = plugin_rpc

        self.init_firewall()

        if sg_rpc.is_firewall_enabled():

            self._setup_rpc()

**** CubicPower OpenStack Study ****

    def _setup_rpc(self):

        self.topic = topics.AGENT

        self.dispatcher = self._create_rpc_dispatcher()

        consumers = [[topics.SECURITY_GROUP, topics.UPDATE]]

        self.connection = agent_rpc.create_consumers(self.dispatcher,

                                                     self.topic,

                                                     consumers)

**** CubicPower OpenStack Study ****

    def _create_rpc_dispatcher(self):

        rpc_callback = HyperVSecurityCallbackMixin(self)

        return dispatcher.RpcDispatcher([rpc_callback])

**** CubicPower OpenStack Study ****

class HyperVSecurityCallbackMixin(sg_rpc.SecurityGroupAgentRpcCallbackMixin):

# Set RPC API version to 1.1 by

**** CubicPower OpenStack Study ****

    def __init__(self, sg_agent):

        self.sg_agent = sg_agent

**** CubicPower OpenStack Study ****

class HyperVPluginApi(agent_rpc.PluginApi, sg_rpc.SecurityGroupServerRpcApiMixin):

pass

**** CubicPower OpenStack Study ****

class HyperVNeutronAgent(object):

# Set RPC API version to 1.0 by

**** CubicPower OpenStack Study ****

    def __init__(self):

        self._utils = utilsfactory.get_hypervutils()

        self._polling_interval = CONF.AGENT.polling_interval

        self._load_physical_network_mappings()

        self._network_vswitch_map = {}

        self._port_metric_retries = {}

        self._set_agent_state()

        self._setup_rpc()

**** CubicPower OpenStack Study ****

    def _set_agent_state(self):

        self.agent_state = {

            'binary': 'neutron-hyperv-agent',

            'host': cfg.CONF.host,

            'topic': n_const.L2_AGENT_TOPIC,

            'configurations': {'vswitch_mappings':

                               self._physical_network_mappings},

            'agent_type': n_const.AGENT_TYPE_HYPERV,

            'start_flag': True}

**** CubicPower OpenStack Study ****

    def _report_state(self):

        try:

            self.state_rpc.report_state(self.context,

                                        self.agent_state)

            self.agent_state.pop('start_flag', None)

        except Exception as ex:

            LOG.exception(_("Failed reporting state! %s"), ex)

**** CubicPower OpenStack Study ****

    def _setup_rpc(self):

        self.agent_id = 'hyperv_%s' % platform.node()

        self.topic = topics.AGENT

        self.plugin_rpc = HyperVPluginApi(topics.PLUGIN)

        self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN)

        # RPC network init

        self.context = context.get_admin_context_without_session()

        # Handle updates from service

        self.dispatcher = self._create_rpc_dispatcher()

        # Define the listening consumers for the agent

        consumers = [[topics.PORT, topics.UPDATE],

                     [topics.NETWORK, topics.DELETE],

                     [topics.PORT, topics.DELETE],

                     [constants.TUNNEL, topics.UPDATE]]

        self.connection = agent_rpc.create_consumers(self.dispatcher,

                                                     self.topic,

                                                     consumers)

        self.sec_groups_agent = HyperVSecurityAgent(

            self.context, self.plugin_rpc)

        report_interval = CONF.AGENT.report_interval

        if report_interval:

            heartbeat = loopingcall.LoopingCall(self._report_state)

            heartbeat.start(interval=report_interval)

**** CubicPower OpenStack Study ****

    def _load_physical_network_mappings(self):

        self._physical_network_mappings = {}

        for mapping in CONF.AGENT.physical_network_vswitch_mappings:

            parts = mapping.split(':')

            if len(parts) != 2:

                LOG.debug(_('Invalid physical network mapping: %s'), mapping)

            else:

                pattern = re.escape(parts[0].strip()).replace('\\*', '.*')

                vswitch = parts[1].strip()

                self._physical_network_mappings[pattern] = vswitch

**** CubicPower OpenStack Study ****

    def _get_vswitch_for_physical_network(self, phys_network_name):

        for pattern in self._physical_network_mappings:

            if phys_network_name is None:

                phys_network_name = ''

            if re.match(pattern, phys_network_name):

                return self._physical_network_mappings[pattern]

        # Not found in the mappings, the vswitch has the same name

        return phys_network_name

**** CubicPower OpenStack Study ****

    def _get_network_vswitch_map_by_port_id(self, port_id):

        for network_id, map in self._network_vswitch_map.iteritems():

            if port_id in map['ports']:

                return (network_id, map)

**** CubicPower OpenStack Study ****

    def network_delete(self, context, network_id=None):

        LOG.debug(_("network_delete received. "

                    "Deleting network %s"), network_id)

        # The network may not be defined on this agent

        if network_id in self._network_vswitch_map:

            self._reclaim_local_network(network_id)

        else:

            LOG.debug(_("Network %s not defined on agent."), network_id)

**** CubicPower OpenStack Study ****

    def port_delete(self, context, port_id=None):

        LOG.debug(_("port_delete received"))

        self._port_unbound(port_id)

**** CubicPower OpenStack Study ****

    def port_update(self, context, port=None, network_type=None,

                    segmentation_id=None, physical_network=None):

        LOG.debug(_("port_update received"))

        if 'security_groups' in port:

            self.sec_groups_agent.refresh_firewall()

        self._treat_vif_port(

            port['id'], port['network_id'],

            network_type, physical_network,

            segmentation_id, port['admin_state_up'])

**** CubicPower OpenStack Study ****

    def _create_rpc_dispatcher(self):

        return dispatcher.RpcDispatcher([self])

**** CubicPower OpenStack Study ****

    def _get_vswitch_name(self, network_type, physical_network):

        if network_type != p_const.TYPE_LOCAL:

            vswitch_name = self._get_vswitch_for_physical_network(

                physical_network)

        else:

            vswitch_name = CONF.AGENT.local_network_vswitch

        return vswitch_name

**** CubicPower OpenStack Study ****

    def _provision_network(self, port_id,

                           net_uuid, network_type,

                           physical_network,

                           segmentation_id):

        LOG.info(_("Provisioning network %s"), net_uuid)

        vswitch_name = self._get_vswitch_name(network_type, physical_network)

        if network_type in [p_const.TYPE_VLAN, p_const.TYPE_FLAT]:

            #Nothing to do

            pass

        elif network_type == p_const.TYPE_LOCAL:

            #TODO(alexpilotti): Check that the switch type is private

            #or create it if not existing

            pass

        else:

            raise utils.HyperVException(

                msg=(_("Cannot provision unknown network type %(network_type)s"

                       " for network %(net_uuid)s") %

                     dict(network_type=network_type, net_uuid=net_uuid)))

        map = {

            'network_type': network_type,

            'vswitch_name': vswitch_name,

            'ports': [],

            'vlan_id': segmentation_id}

        self._network_vswitch_map[net_uuid] = map

**** CubicPower OpenStack Study ****

    def _reclaim_local_network(self, net_uuid):

        LOG.info(_("Reclaiming local network %s"), net_uuid)

        del self._network_vswitch_map[net_uuid]

**** CubicPower OpenStack Study ****

    def _port_bound(self, port_id,

                    net_uuid,

                    network_type,

                    physical_network,

                    segmentation_id):

        LOG.debug(_("Binding port %s"), port_id)

        if net_uuid not in self._network_vswitch_map:

            self._provision_network(

                port_id, net_uuid, network_type,

                physical_network, segmentation_id)

        map = self._network_vswitch_map[net_uuid]

        map['ports'].append(port_id)

        self._utils.connect_vnic_to_vswitch(map['vswitch_name'], port_id)

        if network_type == p_const.TYPE_VLAN:

            LOG.info(_('Binding VLAN ID %(segmentation_id)s '

                       'to switch port %(port_id)s'),

                     dict(segmentation_id=segmentation_id, port_id=port_id))

            self._utils.set_vswitch_port_vlan_id(

                segmentation_id,

                port_id)

        elif network_type == p_const.TYPE_FLAT:

            #Nothing to do

            pass

        elif network_type == p_const.TYPE_LOCAL:

            #Nothing to do

            pass

        else:

            LOG.error(_('Unsupported network type %s'), network_type)

        if CONF.AGENT.enable_metrics_collection:

            self._utils.enable_port_metrics_collection(port_id)

            self._port_metric_retries[port_id] = CONF.AGENT.metrics_max_retries

**** CubicPower OpenStack Study ****

    def _port_unbound(self, port_id):

        (net_uuid, map) = self._get_network_vswitch_map_by_port_id(port_id)

        if net_uuid not in self._network_vswitch_map:

            LOG.info(_('Network %s is not avalailable on this agent'),

                     net_uuid)

            return

        LOG.debug(_("Unbinding port %s"), port_id)

        self._utils.disconnect_switch_port(map['vswitch_name'], port_id, True)

        if not map['ports']:

            self._reclaim_local_network(net_uuid)

**** CubicPower OpenStack Study ****

    def _port_enable_control_metrics(self):

        if not CONF.AGENT.enable_metrics_collection:

            return

        for port_id in self._port_metric_retries.keys():

            if self._utils.can_enable_control_metrics(port_id):

                self._utils.enable_control_metrics(port_id)

                LOG.info(_('Port metrics enabled for port: %s'), port_id)

                del self._port_metric_retries[port_id]

            elif self._port_metric_retries[port_id] < 1:

                self._utils.enable_control_metrics(port_id)

                LOG.error(_('Port metrics raw enabling for port: %s'), port_id)

                del self._port_metric_retries[port_id]

            else:

                self._port_metric_retries[port_id] -= 1

**** CubicPower OpenStack Study ****

    def _update_ports(self, registered_ports):

        ports = self._utils.get_vnic_ids()

        if ports == registered_ports:

            return

        added = ports - registered_ports

        removed = registered_ports - ports

        return {'current': ports,

                'added': added,

                'removed': removed}

**** CubicPower OpenStack Study ****

    def _treat_vif_port(self, port_id, network_id, network_type,

                        physical_network, segmentation_id,

                        admin_state_up):

        if self._utils.vnic_port_exists(port_id):

            if admin_state_up:

                self._port_bound(port_id, network_id, network_type,

                                 physical_network, segmentation_id)

            else:

                self._port_unbound(port_id)

        else:

            LOG.debug(_("No port %s defined on agent."), port_id)

**** CubicPower OpenStack Study ****

    def _treat_devices_added(self, devices):

        resync = False

        for device in devices:

            LOG.info(_("Adding port %s"), device)

            try:

                device_details = self.plugin_rpc.get_device_details(

                    self.context,

                    device,

                    self.agent_id)

            except Exception as e:

                LOG.debug(

                    _("Unable to get port details for "

                      "device %(device)s: %(e)s"),

                    {'device': device, 'e': e})

                resync = True

                continue

            if 'port_id' in device_details:

                LOG.info(

                    _("Port %(device)s updated. Details: %(device_details)s"),

                    {'device': device, 'device_details': device_details})

                self._treat_vif_port(

                    device_details['port_id'],

                    device_details['network_id'],

                    device_details['network_type'],

                    device_details['physical_network'],

                    device_details['segmentation_id'],

                    device_details['admin_state_up'])

                self.sec_groups_agent.prepare_devices_filter(devices)

                self.plugin_rpc.update_device_up(self.context,

                                                 device,

                                                 self.agent_id,

                                                 cfg.CONF.host)

        return resync

**** CubicPower OpenStack Study ****

    def _treat_devices_removed(self, devices):

        resync = False

        for device in devices:

            LOG.info(_("Removing port %s"), device)

            try:

                self.plugin_rpc.update_device_down(self.context,

                                                   device,

                                                   self.agent_id,

                                                   cfg.CONF.host)

            except Exception as e:

                LOG.debug(

                    _("Removing port failed for device %(device)s: %(e)s"),

                    dict(device=device, e=e))

                resync = True

                continue

            self._port_unbound(device)

        return resync

**** CubicPower OpenStack Study ****

    def _process_network_ports(self, port_info):

        resync_a = False

        resync_b = False

        if 'added' in port_info:

            resync_a = self._treat_devices_added(port_info['added'])

        if 'removed' in port_info:

            resync_b = self._treat_devices_removed(port_info['removed'])

        # If one of the above operations fails => resync with plugin

        return (resync_a | resync_b)

**** CubicPower OpenStack Study ****

    def daemon_loop(self):

        sync = True

        ports = set()

        while True:

            try:

                start = time.time()

                if sync:

                    LOG.info(_("Agent out of sync with plugin!"))

                    ports.clear()

                    sync = False

                port_info = self._update_ports(ports)

                # notify plugin about port deltas

                if port_info:

                    LOG.debug(_("Agent loop has new devices!"))

                    # If treat devices fails - must resync with plugin

                    sync = self._process_network_ports(port_info)

                    ports = port_info['current']

                self._port_enable_control_metrics()

            except Exception as e:

                LOG.exception(_("Error in agent event loop: %s"), e)

                sync = True

            # sleep till end of polling interval

            elapsed = (time.time() - start)

            if (elapsed < self._polling_interval):

                time.sleep(self._polling_interval - elapsed)

            else:

                LOG.debug(_("Loop iteration exceeded interval "

                            "(%(polling_interval)s vs. %(elapsed)s)"),

                          {'polling_interval': self._polling_interval,

                           'elapsed': elapsed})

def main():

    eventlet.monkey_patch()

    cfg.CONF(project='neutron')

    logging_config.setup_logging(cfg.CONF)

    plugin = HyperVNeutronAgent()

    # Start everything.

    LOG.info(_("Agent initialized successfully, now running... "))

    plugin.daemon_loop()

**** CubicPower OpenStack Study ****

def main():

    eventlet.monkey_patch()

    cfg.CONF(project='neutron')

    logging_config.setup_logging(cfg.CONF)

    plugin = HyperVNeutronAgent()

    # Start everything.

    LOG.info(_("Agent initialized successfully, now running... "))

    plugin.daemon_loop()