¡@

Home 

OpenStack Study: nexus_db_v2.py

OpenStack Index

**** CubicPower OpenStack Study ****

def get_nexusport_binding(port_id, vlan_id, switch_ip, instance_id):

    """Lists a nexusport binding."""

    LOG.debug(_("get_nexusport_binding() called"))

    return _lookup_all_nexus_bindings(port_id=port_id,

                                      vlan_id=vlan_id,

                                      switch_ip=switch_ip,

                                      instance_id=instance_id)

**** CubicPower OpenStack Study ****

def get_nexusvlan_binding(vlan_id, switch_ip):

    """Lists a vlan and switch binding."""

    LOG.debug(_("get_nexusvlan_binding() called"))

    return _lookup_all_nexus_bindings(vlan_id=vlan_id, switch_ip=switch_ip)

**** CubicPower OpenStack Study ****

def add_nexusport_binding(port_id, vlan_id, switch_ip, instance_id):

    """Adds a nexusport binding."""

    LOG.debug(_("add_nexusport_binding() called"))

    session = db.get_session()

    binding = nexus_models_v2.NexusPortBinding(port_id=port_id,

                                               vlan_id=vlan_id,

                                               switch_ip=switch_ip,

                                               instance_id=instance_id)

    session.add(binding)

    session.flush()

    return binding

**** CubicPower OpenStack Study ****

def remove_nexusport_binding(port_id, vlan_id, switch_ip, instance_id):

    """Removes a nexusport binding."""

    LOG.debug(_("remove_nexusport_binding() called"))

    session = db.get_session()

    binding = _lookup_all_nexus_bindings(session=session,

                                         vlan_id=vlan_id,

                                         switch_ip=switch_ip,

                                         port_id=port_id,

                                         instance_id=instance_id)

    for bind in binding:

        session.delete(bind)

    session.flush()

    return binding

**** CubicPower OpenStack Study ****

def update_nexusport_binding(port_id, new_vlan_id):

    """Updates nexusport binding."""

    if not new_vlan_id:

        LOG.warning(_("update_nexusport_binding called with no vlan"))

        return

    LOG.debug(_("update_nexusport_binding called"))

    session = db.get_session()

    binding = _lookup_one_nexus_binding(session=session, port_id=port_id)

    binding.vlan_id = new_vlan_id

    session.merge(binding)

    session.flush()

    return binding

**** CubicPower OpenStack Study ****

def get_nexusvm_bindings(vlan_id, instance_id):

    """Lists nexusvm bindings."""

    LOG.debug(_("get_nexusvm_binding() called"))

    return _lookup_all_nexus_bindings(vlan_id=vlan_id,

                                      instance_id=instance_id)

**** CubicPower OpenStack Study ****

def get_port_vlan_switch_binding(port_id, vlan_id, switch_ip):

    """Lists nexusvm bindings."""

    LOG.debug(_("get_port_vlan_switch_binding() called"))

    return _lookup_all_nexus_bindings(port_id=port_id,

                                      switch_ip=switch_ip,

                                      vlan_id=vlan_id)

**** CubicPower OpenStack Study ****

def get_port_switch_bindings(port_id, switch_ip):

    """List all vm/vlan bindings on a Nexus switch port."""

    LOG.debug(_("get_port_switch_bindings() called, "

                "port:'%(port_id)s', switch:'%(switch_ip)s'"),

              {'port_id': port_id, 'switch_ip': switch_ip})

    try:

        return _lookup_all_nexus_bindings(port_id=port_id,

                                          switch_ip=switch_ip)

    except c_exc.NexusPortBindingNotFound:

        pass

**** CubicPower OpenStack Study ****

def get_nexussvi_bindings():

    """Lists nexus svi bindings."""

    LOG.debug(_("get_nexussvi_bindings() called"))

    return _lookup_all_nexus_bindings(port_id='router')

**** CubicPower OpenStack Study ****

def _lookup_nexus_bindings(query_type, session=None, **bfilter):

    """Look up 'query_type' Nexus bindings matching the filter.

    :param query_type: 'all', 'one' or 'first'

    :param session: db session

    :param bfilter: filter for bindings query

    :return: bindings if query gave a result, else

             raise NexusPortBindingNotFound.

    """

    if session is None:

        session = db.get_session()

    query_method = getattr(session.query(

        nexus_models_v2.NexusPortBinding).filter_by(**bfilter), query_type)

    try:

        bindings = query_method()

        if bindings:

            return bindings

    except sa_exc.NoResultFound:

        pass

    raise c_exc.NexusPortBindingNotFound(**bfilter)

**** CubicPower OpenStack Study ****

def _lookup_all_nexus_bindings(session=None, **bfilter):

    return _lookup_nexus_bindings('all', session, **bfilter)

**** CubicPower OpenStack Study ****

def _lookup_one_nexus_binding(session=None, **bfilter):

    return _lookup_nexus_bindings('one', session, **bfilter)

**** CubicPower OpenStack Study ****

def _lookup_first_nexus_binding(session=None, **bfilter):

    return _lookup_nexus_bindings('first', session, **bfilter)